summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/tests
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2020-06-09 19:20:22 +0100
committerefiacor <fiachra.corcoran@est.tech>2020-06-17 14:57:29 +0100
commit80ff14860e3b8a7a2c29272c2c10c1e830c2141d (patch)
tree7e97d2b90e1dd5b6765e0bc88932c53197a107cf /components/pm-subscription-handler/tests
parent2760519436f78975f16f26c412e842e34b28d624 (diff)
[PMSH] Improve CBS data handling
# AppConfog object fetch priodically # AAI client to only fetch AAI data Signed-off-by: efiacor <fiachra.corcoran@est.tech> Change-Id: I78315f141c3bb7e8b0d9efa818d294415fa79918 Issue-ID: DCAEGEN2-2146
Diffstat (limited to 'components/pm-subscription-handler/tests')
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_aai_event_handler.py22
-rw-r--r--components/pm-subscription-handler/tests/test_aai_service.py20
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_controller.py17
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_exit_handler.py9
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_network_function.py2
-rw-r--r--components/pm-subscription-handler/tests/test_pmsh_utils.py85
-rw-r--r--components/pm-subscription-handler/tests/test_policy_response_handler.py63
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_subscription.py152
-rw-r--r--components/pm-subscription-handler/tests/test_subscription_handler.py76
9 files changed, 244 insertions, 202 deletions
diff --git a/components/pm-subscription-handler/tests/test_aai_event_handler.py b/components/pm-subscription-handler/tests/test_aai_event_handler.py
index f57df4cd..d366dac5 100755
--- a/components/pm-subscription-handler/tests/test_aai_event_handler.py
+++ b/components/pm-subscription-handler/tests/test_aai_event_handler.py
@@ -22,34 +22,36 @@ from unittest.mock import patch, Mock
from mod.aai_event_handler import process_aai_events
from mod.network_function import NetworkFunction, OrchestrationStatus
+from mod.pmsh_utils import AppConfig
class AAIEventHandlerTest(TestCase):
- def setUp(self):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ def setUp(self, mock_get_pmsh_config):
with open(path.join(path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
- self.cbs_data_1 = json.load(data)
+ self.cbs_data = json.load(data)
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
with open(path.join(path.dirname(__file__), 'data/mr_aai_events.json'), 'r') as data:
self.mr_aai_events = json.load(data)["mr_response"]
- self.mock_sub = Mock(nfFilter={'swVersions': ['1.0.0', '1.0.1'],
- 'nfNames': ['^pnf.*', '^vnf.*']})
self.mock_mr_sub = Mock(get_from_topic=Mock(return_value=self.mr_aai_events))
self.mock_mr_pub = Mock()
self.mock_app = Mock()
+ @patch('mod.subscription.Subscription.process_subscription')
@patch('mod.aai_event_handler.NetworkFunction.delete')
@patch('mod.aai_event_handler.NetworkFunction.get')
- @patch('pmsh_service_main.AppConfig')
- def test_process_aai_update_and_delete_events(self, mock_app_conf, mock_nf_get, mock_nf_delete):
+ def test_process_aai_update_and_delete_events(self, mock_nf_get, mock_nf_delete,
+ mock_process_sub):
pnf_already_active = NetworkFunction(nf_name='pnf_already_active',
orchestration_status=OrchestrationStatus.ACTIVE.value)
mock_nf_get.side_effect = [None, pnf_already_active]
expected_nf_for_processing = NetworkFunction(
nf_name='pnf_newly_discovered', orchestration_status=OrchestrationStatus.ACTIVE.value)
- process_aai_events(self.mock_mr_sub, self.mock_sub,
- self.mock_mr_pub, self.mock_app, mock_app_conf)
+ process_aai_events(self.mock_mr_sub, self.mock_mr_pub, self.mock_app, self.app_conf)
- self.mock_sub.process_subscription.assert_called_once_with([expected_nf_for_processing],
- self.mock_mr_pub, mock_app_conf)
+ mock_process_sub.assert_called_once_with([expected_nf_for_processing],
+ self.mock_mr_pub, self.app_conf)
mock_nf_delete.assert_called_once_with(nf_name='pnf_to_be_deleted')
diff --git a/components/pm-subscription-handler/tests/test_aai_service.py b/components/pm-subscription-handler/tests/test_aai_service.py
index 9a3a1b69..9f370096 100644
--- a/components/pm-subscription-handler/tests/test_aai_service.py
+++ b/components/pm-subscription-handler/tests/test_aai_service.py
@@ -26,13 +26,15 @@ from requests import Session
import mod.aai_client as aai_client
from mod import create_app
+from mod.pmsh_utils import AppConfig
class AaiClientTestCase(TestCase):
- @patch('mod.update_config')
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ @patch('mod.update_logging_config')
@patch('mod.get_db_connection_url')
- def setUp(self, mock_get_db_url, mock_update_config):
+ def setUp(self, mock_get_db_url, mock_update_config, mock_get_pmsh_config):
mock_get_db_url.return_value = 'sqlite://'
self.env = EnvironmentVarGuard()
self.env.set('AAI_SERVICE_HOST', '1.2.3.4')
@@ -40,6 +42,8 @@ class AaiClientTestCase(TestCase):
self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
self.aai_response_data = data.read()
self.app = create_app()
@@ -48,9 +52,9 @@ class AaiClientTestCase(TestCase):
def test_aai_client_get_pm_sub_data_success(self, mock_session):
mock_session.return_value.status_code = 200
mock_session.return_value.text = self.aai_response_data
- sub, xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data)
- self.assertEqual(sub.subscriptionName, 'ExtraPM-All-gNB-R2B')
- self.assertEqual(sub.administrativeState, 'UNLOCKED')
+ xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
+ self.assertEqual(self.app_conf.subscription.subscriptionName, 'ExtraPM-All-gNB-R2B')
+ self.assertEqual(self.app_conf.subscription.administrativeState, 'UNLOCKED')
self.assertEqual(len(xnfs), 3)
@patch.object(Session, 'put')
@@ -58,21 +62,21 @@ class AaiClientTestCase(TestCase):
mock_session.return_value.status_code = 404
with mock.patch('mod.aai_client._get_all_aai_nf_data', return_value=None):
with self.assertRaises(RuntimeError):
- aai_client.get_pmsh_subscription_data(self.cbs_data)
+ aai_client.get_pmsh_nfs_from_aai(self.cbs_data)
@responses.activate
def test_aai_client_get_all_aai_xnf_data_not_found(self):
responses.add(responses.PUT,
'https://1.2.3.4:8443/aai/v16/query?format=simple&nodesOnly=true',
json={'error': 'not found'}, status=404)
- self.assertIsNone(aai_client._get_all_aai_nf_data())
+ self.assertIsNone(aai_client._get_all_aai_nf_data(self.app_conf))
@responses.activate
def test_aai_client_get_all_aai_xnf_data_success(self):
responses.add(responses.PUT,
'https://1.2.3.4:8443/aai/v16/query?format=simple&nodesOnly=true',
json={'dummy_data': 'blah_blah'}, status=200)
- self.assertIsNotNone(aai_client._get_all_aai_nf_data())
+ self.assertIsNotNone(aai_client._get_all_aai_nf_data(self.app_conf))
def test_aai_client_get_aai_service_url_fail(self):
self.env.clear()
diff --git a/components/pm-subscription-handler/tests/test_controller.py b/components/pm-subscription-handler/tests/test_controller.py
index 8ca208ae..57230429 100755
--- a/components/pm-subscription-handler/tests/test_controller.py
+++ b/components/pm-subscription-handler/tests/test_controller.py
@@ -26,13 +26,16 @@ from requests import Session
from mod import aai_client, create_app, db
from mod.api.controller import status, get_all_sub_to_nf_relations
from mod.network_function import NetworkFunction
+from mod.pmsh_utils import AppConfig
class ControllerTestCase(unittest.TestCase):
- @patch('mod.update_config')
+
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ @patch('mod.update_logging_config')
@patch('mod.get_db_connection_url')
@patch.object(Session, 'put')
- def setUp(self, mock_session, mock_get_db_url, mock_update_config):
+ def setUp(self, mock_session, mock_get_db_url, mock_update_config, mock_get_pmsh_config):
mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
self.aai_response_data = data.read()
@@ -43,8 +46,10 @@ class ControllerTestCase(unittest.TestCase):
self.env.set('AAI_SERVICE_PORT', '8443')
self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
- self.cbs_data_1 = json.load(data)
- self.sub_1, self.xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data_1)
+ self.cbs_data = json.load(data)
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
+ self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
self.app = create_app()
@@ -61,9 +66,9 @@ class ControllerTestCase(unittest.TestCase):
self.assertEqual(status()['status'], 'healthy')
def test_get_all_sub_to_nf_relations(self):
- self.sub_1.create()
+ self.app_conf.subscription.create()
for nf in [self.nf_1, self.nf_2]:
- self.sub_1.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf)
all_subs = get_all_sub_to_nf_relations()
self.assertEqual(len(all_subs[0]['network_functions']), 2)
self.assertEqual(all_subs[0]['subscription_name'], 'ExtraPM-All-gNB-R2B')
diff --git a/components/pm-subscription-handler/tests/test_exit_handler.py b/components/pm-subscription-handler/tests/test_exit_handler.py
index 0cce1db9..ac1e15c6 100755
--- a/components/pm-subscription-handler/tests/test_exit_handler.py
+++ b/components/pm-subscription-handler/tests/test_exit_handler.py
@@ -30,10 +30,8 @@ from mod.subscription import AdministrativeState
class ExitHandlerTests(TestCase):
- @patch('pmsh_service_main.ConfigHandler')
@patch('pmsh_service_main.create_app')
@patch('pmsh_service_main.db')
- @patch('pmsh_service_main.aai.get_pmsh_subscription_data')
@patch('pmsh_service_main.AppConfig')
@patch('pmsh_service_main.Subscription')
@patch('pmsh_service_main.launch_api_server')
@@ -41,10 +39,9 @@ class ExitHandlerTests(TestCase):
@patch.object(PeriodicTask, 'start')
@patch.object(PeriodicTask, 'cancel')
def test_terminate_signal_success(self, mock_task_cancel, mock_task_start, mock_sub_handler,
- mock_launch_api_server, mock_sub, mock_app_conf, mock_aai,
- mock_db, mock_app, mock_config_handler):
+ mock_launch_api_server, mock_sub, mock_app_conf,
+ mock_db, mock_app):
pid = os.getpid()
- mock_aai.return_value = [Mock(), Mock()]
mock_db.get_app.return_value = Mock()
mock_sub.administrativeState = AdministrativeState.UNLOCKED.value
@@ -67,7 +64,7 @@ class ExitHandlerTests(TestCase):
pmsh_service_main.main()
- self.assertEqual(3, mock_task_cancel.call_count)
+ self.assertEqual(4, mock_task_cancel.call_count)
self.assertTrue(ExitHandler.shutdown_signal_received)
self.assertEqual(1, mock_sub.process_subscription.call_count)
self.assertEqual(mock_sub.administrativeState, AdministrativeState.LOCKED.value)
diff --git a/components/pm-subscription-handler/tests/test_network_function.py b/components/pm-subscription-handler/tests/test_network_function.py
index a5324bd1..86baef83 100755
--- a/components/pm-subscription-handler/tests/test_network_function.py
+++ b/components/pm-subscription-handler/tests/test_network_function.py
@@ -27,7 +27,7 @@ from mod.subscription import Subscription
class NetworkFunctionTests(TestCase):
- @patch('mod.update_config')
+ @patch('mod.update_logging_config')
@patch('mod.get_db_connection_url')
def setUp(self, mock_get_db_url, mock_update_config):
mock_get_db_url.return_value = 'sqlite://'
diff --git a/components/pm-subscription-handler/tests/test_pmsh_utils.py b/components/pm-subscription-handler/tests/test_pmsh_utils.py
index 0c3b2d28..cfb78def 100644
--- a/components/pm-subscription-handler/tests/test_pmsh_utils.py
+++ b/components/pm-subscription-handler/tests/test_pmsh_utils.py
@@ -23,63 +23,80 @@ from unittest.mock import patch
import responses
from requests import Session
+from tenacity import RetryError
from mod import db, get_db_connection_url, create_app
from mod.pmsh_utils import AppConfig
-from mod.subscription import Subscription
class PmshUtilsTestCase(TestCase):
- @patch('mod.update_config')
+ @patch('mod.update_logging_config')
@patch('mod.create_app')
@patch('mod.get_db_connection_url')
def setUp(self, mock_get_db_url, mock_app, mock_update_config):
mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
- self.app_conf = AppConfig(**self.cbs_data['config'])
- self.sub = Subscription(**self.cbs_data['policy']['subscription'])
self.env = EnvironmentVarGuard()
- self.env.set('TESTING', 'True')
self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
- self.policy_mr_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
self.mock_app = mock_app
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
- def test_utils_get_mr_sub(self):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ def test_utils_get_mr_sub(self, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
self.assertTrue(mr_policy_sub.aaf_id, 'dcae@dcae.onap.org')
- def test_utils_get_mr_sub_fails_with_invalid_name(self):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ def test_utils_get_mr_sub_fails_with_invalid_name(self, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
with self.assertRaises(KeyError):
self.app_conf.get_mr_sub('invalid_sub')
- def test_utils_get_mr_pub(self):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ def test_utils_get_mr_pub(self, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
self.assertTrue(mr_policy_pub.aaf_pass, 'demo123456!')
- def test_utils_get_mr_pub_fails_with_invalid_name(self):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ def test_utils_get_mr_pub_fails_with_invalid_name(self, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
with self.assertRaises(KeyError):
self.app_conf.get_mr_pub('invalid_pub')
- def test_utils_get_cert_data(self):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ def test_utils_get_cert_data(self, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
self.assertTrue(self.app_conf.cert_params, ('/opt/app/pm-mapper/etc/certs/cert.pem',
'/opt/app/pm-mapper/etc/certs/key.pem'))
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
@patch.object(Session, 'post')
- def test_mr_pub_publish_to_topic_success(self, mock_session):
+ def test_mr_pub_publish_to_topic_success(self, mock_session, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
mock_session.return_value.status_code = 200
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
with patch('requests.Session.post') as session_post_call:
mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
session_post_call.assert_called_once()
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
@responses.activate
- def test_mr_pub_publish_to_topic_fail(self):
+ def test_mr_pub_publish_to_topic_fail(self, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
responses.add(responses.POST,
'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS',
json={'error': 'Client Error'}, status=400)
@@ -87,28 +104,40 @@ class PmshUtilsTestCase(TestCase):
with self.assertRaises(Exception):
mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
- def test_mr_pub_publish_sub_event_data_success(self):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ def test_mr_pub_publish_sub_event_data_success(self, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
with patch('mod.pmsh_utils._MrPub.publish_to_topic') as pub_to_topic_call:
- mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201', self.app_conf)
+ mr_policy_pub.publish_subscription_event_data(self.app_conf.subscription, 'pnf201',
+ self.app_conf)
pub_to_topic_call.assert_called_once()
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
@responses.activate
- def test_mr_sub_get_from_topic_success(self):
+ def test_mr_sub_get_from_topic_success(self, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
+ policy_mr_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
responses.add(responses.GET,
'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
'dcae_pmsh_cg/1?timeout=1000',
json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200)
- mr_topic_data = self.policy_mr_sub.get_from_topic(1)
+ mr_topic_data = policy_mr_sub.get_from_topic(1)
self.assertIsNotNone(mr_topic_data)
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
@responses.activate
- def test_mr_sub_get_from_topic_fail(self):
+ def test_mr_sub_get_from_topic_fail(self, mock_get_pmsh_config):
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
+ policy_mr_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
responses.add(responses.GET,
'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
'dcae_pmsh_cg/1?timeout=1000',
json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400)
- mr_topic_data = self.policy_mr_sub.get_from_topic(1)
+ mr_topic_data = policy_mr_sub.get_from_topic(1)
self.assertIsNone(mr_topic_data)
def test_get_db_connection_url_success(self):
@@ -125,3 +154,21 @@ class PmshUtilsTestCase(TestCase):
self.env.set('PMSH_PG_PASSWORD', 'pass')
with self.assertRaises(Exception):
get_db_connection_url()
+
+ @patch('mod.logger.info')
+ @patch('mod.pmsh_utils.get_all')
+ def test_refresh_config_success(self, mock_cbs_client_get_all, mock_logger):
+ mock_cbs_client_get_all.return_value = self.cbs_data
+ self.app_conf = AppConfig()
+ self.app_conf.refresh_config()
+ mock_logger.assert_called_with('AppConfig data has been refreshed')
+
+ @patch('mod.logger.debug')
+ @patch('mod.pmsh_utils.get_all')
+ def test_refresh_config_fail(self, mock_cbs_client_get_all, mock_logger):
+ mock_cbs_client_get_all.return_value = self.cbs_data
+ self.app_conf = AppConfig()
+ mock_cbs_client_get_all.side_effect = Exception
+ with self.assertRaises(RetryError):
+ self.app_conf.refresh_config()
+ mock_logger.assert_called_with('Failed to refresh AppConfig data')
diff --git a/components/pm-subscription-handler/tests/test_policy_response_handler.py b/components/pm-subscription-handler/tests/test_policy_response_handler.py
index 6de33632..582f0bc8 100644
--- a/components/pm-subscription-handler/tests/test_policy_response_handler.py
+++ b/components/pm-subscription-handler/tests/test_policy_response_handler.py
@@ -24,34 +24,38 @@ from tenacity import stop_after_attempt
from mod.api.db_models import SubscriptionModel
from mod.network_function import NetworkFunction
-from mod.subscription import AdministrativeState, SubNfState
+from mod.pmsh_utils import AppConfig
from mod.policy_response_handler import PolicyResponseHandler, policy_response_handle_functions
+from mod.subscription import AdministrativeState, SubNfState
class PolicyResponseHandlerTest(TestCase):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
@patch('mod.create_app')
@patch('mod.subscription.Subscription')
@patch('mod.pmsh_utils._MrSub')
- def setUp(self, mock_mr_sub, mock_sub, mock_app):
+ def setUp(self, mock_mr_sub, mock_sub, mock_app, mock_get_app_conf):
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
self.mock_policy_mr_sub = mock_mr_sub
- self.mock_sub = mock_sub
- self.mock_sub.subscriptionName = 'ExtraPM-All-gNB-R2B'
+ mock_get_app_conf.return_value = self.cbs_data
+ self.app_conf = AppConfig()
+ self.sub = self.app_conf.subscription
self.mock_app = mock_app
self.nf = NetworkFunction(nf_name='nf1')
self.policy_response_handler = PolicyResponseHandler(self.mock_policy_mr_sub,
- self.mock_sub.subscriptionName,
+ self.app_conf,
self.mock_app)
@patch('mod.network_function.NetworkFunction.delete')
def test_handle_response_locked_success(self, mock_delete):
with patch.dict(policy_response_handle_functions,
{AdministrativeState.LOCKED.value: {'success': mock_delete}}):
- self.policy_response_handler._handle_response(self.mock_sub.subscriptionName,
- AdministrativeState.LOCKED.value,
- self.nf.nf_name, 'success')
+ self.policy_response_handler._handle_response(
+ self.app_conf.subscription.subscriptionName,
+ AdministrativeState.LOCKED.value,
+ self.nf.nf_name, 'success')
mock_delete.assert_called()
@@ -59,34 +63,37 @@ class PolicyResponseHandlerTest(TestCase):
def test_handle_response_locked_failed(self, mock_update_sub_nf):
with patch.dict(policy_response_handle_functions,
{AdministrativeState.LOCKED.value: {'failed': mock_update_sub_nf}}):
- self.policy_response_handler._handle_response(self.mock_sub.subscriptionName,
- AdministrativeState.LOCKED.value,
- self.nf.nf_name, 'failed')
- mock_update_sub_nf.assert_called_with(subscription_name=self.mock_sub.subscriptionName,
- status=SubNfState.DELETE_FAILED.value,
- nf_name=self.nf.nf_name)
+ self.policy_response_handler._handle_response(
+ self.app_conf.subscription.subscriptionName,
+ AdministrativeState.LOCKED.value,
+ self.nf.nf_name, 'failed')
+ mock_update_sub_nf.assert_called_with(
+ subscription_name=self.app_conf.subscription.subscriptionName,
+ status=SubNfState.DELETE_FAILED.value, nf_name=self.nf.nf_name)
@patch('mod.subscription.Subscription.update_sub_nf_status')
def test_handle_response_unlocked_success(self, mock_update_sub_nf):
with patch.dict(policy_response_handle_functions,
{AdministrativeState.UNLOCKED.value: {'success': mock_update_sub_nf}}):
- self.policy_response_handler._handle_response(self.mock_sub.subscriptionName,
- AdministrativeState.UNLOCKED.value,
- self.nf.nf_name, 'success')
- mock_update_sub_nf.assert_called_with(subscription_name=self.mock_sub.subscriptionName,
- status=SubNfState.CREATED.value,
- nf_name=self.nf.nf_name)
+ self.policy_response_handler._handle_response(
+ self.app_conf.subscription.subscriptionName,
+ AdministrativeState.UNLOCKED.value,
+ self.nf.nf_name, 'success')
+ mock_update_sub_nf.assert_called_with(
+ subscription_name=self.app_conf.subscription.subscriptionName,
+ status=SubNfState.CREATED.value, nf_name=self.nf.nf_name)
@patch('mod.subscription.Subscription.update_sub_nf_status')
def test_handle_response_unlocked_failed(self, mock_update_sub_nf):
with patch.dict(policy_response_handle_functions,
{AdministrativeState.UNLOCKED.value: {'failed': mock_update_sub_nf}}):
- self.policy_response_handler._handle_response(self.mock_sub.subscriptionName,
- AdministrativeState.UNLOCKED.value,
- self.nf.nf_name, 'failed')
- mock_update_sub_nf.assert_called_with(subscription_name=self.mock_sub.subscriptionName,
- status=SubNfState.CREATE_FAILED.value,
- nf_name=self.nf.nf_name)
+ self.policy_response_handler._handle_response(
+ self.app_conf.subscription.subscriptionName,
+ AdministrativeState.UNLOCKED.value,
+ self.nf.nf_name, 'failed')
+ mock_update_sub_nf.assert_called_with(
+ subscription_name=self.app_conf.subscription.subscriptionName,
+ status=SubNfState.CREATE_FAILED.value, nf_name=self.nf.nf_name)
def test_handle_response_exception(self):
self.assertRaises(Exception, self.policy_response_handler._handle_response, 'sub1',
@@ -101,10 +108,8 @@ class PolicyResponseHandlerTest(TestCase):
mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
status=AdministrativeState.UNLOCKED.value)
self.policy_response_handler.poll_policy_topic()
-
self.mock_policy_mr_sub.get_from_topic.assert_called()
-
- mock_handle_response.assert_called_with(self.mock_sub.subscriptionName,
+ mock_handle_response.assert_called_with(self.app_conf.subscription.subscriptionName,
AdministrativeState.UNLOCKED.value, 'pnf300',
'success')
diff --git a/components/pm-subscription-handler/tests/test_subscription.py b/components/pm-subscription-handler/tests/test_subscription.py
index b50d9aaf..95db6b24 100755
--- a/components/pm-subscription-handler/tests/test_subscription.py
+++ b/components/pm-subscription-handler/tests/test_subscription.py
@@ -27,19 +27,19 @@ from tenacity import stop_after_attempt
import mod.aai_client as aai_client
from mod import db, create_app
from mod.api.db_models import NetworkFunctionModel
-from mod.network_function import NetworkFunction, NetworkFunctionFilter, OrchestrationStatus
+from mod.network_function import NetworkFunction, OrchestrationStatus
from mod.pmsh_utils import AppConfig
from mod.subscription import Subscription
class SubscriptionTest(TestCase):
- @patch('mod.update_config')
+ @patch('mod.update_logging_config')
@patch('mod.pmsh_utils._MrPub')
@patch('mod.pmsh_utils._MrSub')
@patch('mod.get_db_connection_url')
@patch.object(Session, 'put')
- @patch('pmsh_service_main.AppConfig')
- def setUp(self, mock_app_config, mock_session, mock_get_db_url,
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ def setUp(self, mock_get_pmsh_config, mock_session, mock_get_db_url,
mock_mr_sub, mock_mr_pub, mock_update_config):
mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
@@ -51,21 +51,15 @@ class SubscriptionTest(TestCase):
self.env.set('AAI_SERVICE_PORT', '8443')
self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
- self.cbs_data_1 = json.load(data)
- with open(os.path.join(os.path.dirname(__file__),
- 'data/cbs_data_2.json'), 'r') as data:
- self.cbs_data_2 = json.load(data)
- self.sub_1, self.xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data_1)
- self.sub_2, self.xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data_2)
- self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
- self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
- self.xnf_filter = NetworkFunctionFilter(**self.sub_1.nfFilter)
+ self.cbs_data = json.load(data)
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
+ self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
self.mock_mr_sub = mock_mr_sub
self.mock_mr_pub = mock_mr_pub
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
- self.mock_app_config = mock_app_config
db.create_all()
def tearDown(self):
@@ -74,21 +68,22 @@ class SubscriptionTest(TestCase):
self.app_context.pop()
def test_xnf_filter_true(self):
- self.assertTrue(self.xnf_filter.is_nf_in_filter('pnf1', OrchestrationStatus.ACTIVE.value))
+ self.assertTrue(self.app_conf.nf_filter.is_nf_in_filter('pnf1',
+ OrchestrationStatus.ACTIVE.value))
def test_xnf_filter_false(self):
- self.assertFalse(self.xnf_filter.is_nf_in_filter('PNF-33',
- OrchestrationStatus.ACTIVE.value))
+ self.assertFalse(self.app_conf.nf_filter.is_nf_in_filter('PNF-33',
+ OrchestrationStatus.ACTIVE.value))
def test_sub_measurement_group(self):
- self.assertEqual(len(self.sub_1.measurementGroups), 2)
+ self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2)
def test_sub_file_location(self):
- self.assertEqual(self.sub_1.fileLocation, '/pm/pm.xml')
+ self.assertEqual(self.app_conf.subscription.fileLocation, '/pm/pm.xml')
def test_get_subscription(self):
sub_name = 'ExtraPM-All-gNB-R2B'
- self.sub_1.create()
+ self.app_conf.subscription.create()
new_sub = Subscription.get(sub_name)
self.assertEqual(sub_name, new_sub.subscription_name)
@@ -97,77 +92,66 @@ class SubscriptionTest(TestCase):
sub = Subscription.get(sub_name)
self.assertEqual(sub, None)
- def test_get_subscriptions(self):
- self.sub_1.create()
- self.sub_2.create()
- subs = self.sub_1.get_all()
-
- self.assertEqual(2, len(subs))
-
def test_get_nf_names_per_sub(self):
- self.sub_1.create()
- self.sub_1.add_network_function_to_subscription(self.nf_1)
- self.sub_1.add_network_function_to_subscription(self.nf_2)
- nfs = Subscription.get_nf_names_per_sub(self.sub_1.subscriptionName)
+ self.app_conf.subscription.create()
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1])
+ nfs = Subscription.get_nf_names_per_sub(self.app_conf.subscription.subscriptionName)
self.assertEqual(2, len(nfs))
def test_create_existing_subscription(self):
- sub1 = self.sub_1.create()
- same_sub1 = self.sub_1.create()
+ sub1 = self.app_conf.subscription.create()
+ same_sub1 = self.app_conf.subscription.create()
self.assertEqual(sub1, same_sub1)
- self.assertEqual(1, len(self.sub_1.get_all()))
+ self.assertEqual(1, len(self.app_conf.subscription.get_all()))
def test_add_network_functions_per_subscription(self):
- for nf in [self.nf_1, self.nf_2]:
- self.sub_1.add_network_function_to_subscription(nf)
+ for nf in self.xnfs:
+ self.app_conf.subscription.add_network_function_to_subscription(nf)
nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
- self.assertEqual(2, len(nfs_for_sub_1))
- new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Inventoried')
- self.sub_1.add_network_function_to_subscription(new_nf)
+ self.assertEqual(3, len(nfs_for_sub_1))
+ new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Active')
+ self.app_conf.subscription.add_network_function_to_subscription(new_nf)
nf_subs = Subscription.get_all_nfs_subscription_relations()
- self.assertEqual(3, len(nf_subs))
+ self.assertEqual(4, len(nf_subs))
def test_add_duplicate_network_functions_per_subscription(self):
- self.sub_1.add_network_function_to_subscription(self.nf_1)
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
- self.sub_1.add_network_function_to_subscription(self.nf_1)
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
def test_update_subscription_status(self):
sub_name = 'ExtraPM-All-gNB-R2B'
- self.sub_1.create()
- self.sub_1.administrativeState = 'new_status'
- self.sub_1.update_subscription_status()
+ self.app_conf.subscription.create()
+ self.app_conf.subscription.administrativeState = 'new_status'
+ self.app_conf.subscription.update_subscription_status()
sub = Subscription.get(sub_name)
self.assertEqual('new_status', sub.status)
def test_delete_subscription(self):
- for nf in [self.nf_1, self.nf_2]:
- self.sub_1.add_network_function_to_subscription(nf)
- for nf in [self.nf_2]:
- self.sub_2.add_network_function_to_subscription(nf)
-
- self.sub_1.delete_subscription()
-
- self.assertEqual(1, len(Subscription.get_all()))
- self.assertEqual(None, Subscription.get(self.sub_1.subscriptionName))
- self.assertEqual(1, len(Subscription.get_all_nfs_subscription_relations()))
- self.assertEqual(1, len(NetworkFunction.get_all()))
- self.assertEqual(None, NetworkFunction.get(nf_name=self.nf_1.nf_name))
+ for nf in self.xnfs:
+ self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.delete_subscription()
+ self.assertEqual(0, len(Subscription.get_all()))
+ self.assertEqual(None, Subscription.get(self.app_conf.subscription.subscriptionName))
+ self.assertEqual(0, len(Subscription.get_all_nfs_subscription_relations()))
+ self.assertEqual(0, len(NetworkFunction.get_all()))
+ self.assertEqual(None, NetworkFunction.get(nf_name=list(self.xnfs)[0].nf_name))
def test_update_sub_nf_status(self):
sub_name = 'ExtraPM-All-gNB-R2B'
- for nf in [self.nf_1, self.nf_2]:
- self.sub_1.add_network_function_to_subscription(nf)
+ for nf in self.xnfs:
+ self.app_conf.subscription.add_network_function_to_subscription(nf)
sub_nfs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
- Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_1')
+ Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_23')
sub_nfs = Subscription.get_all_nfs_subscription_relations()
- self.assertEqual('Active', sub_nfs[0].nf_sub_status)
+ self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
@patch('mod.subscription.Subscription.add_network_function_to_subscription')
@@ -175,53 +159,55 @@ class SubscriptionTest(TestCase):
@patch('mod.subscription.Subscription.update_subscription_status')
def test_process_activate_subscription(self, mock_update_sub_status,
mock_update_sub_nf, mock_add_nfs):
- self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
- self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
+ self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
+ self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
+ self.app_conf)
mock_update_sub_status.assert_called()
mock_add_nfs.assert_called()
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
- mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
- 'PENDING_CREATE', self.nf_1.nf_name)
+ mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
+ 'PENDING_CREATE', list(self.xnfs)[0].nf_name)
@patch('mod.subscription.Subscription.update_sub_nf_status')
@patch('mod.subscription.Subscription.update_subscription_status')
def test_process_deactivate_subscription(self, mock_update_sub_status,
mock_update_sub_nf):
- self.sub_1.administrativeState = 'LOCKED'
- self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
- self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
+ self.app_conf.subscription.administrativeState = 'LOCKED'
+ self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
+ self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
+ self.app_conf)
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
- mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
- 'PENDING_DELETE', self.nf_1.nf_name)
+ mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
+ 'PENDING_DELETE', list(self.xnfs)[0].nf_name)
mock_update_sub_status.assert_called()
def test_process_subscription_exception(self):
- self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
- self.assertRaises(Exception, self.sub_1.process_subscription,
- [self.nf_1], 'not_mr_pub', 'app_config')
+ self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
+ self.assertRaises(Exception, self.app_conf.subscription.process_subscription,
+ [list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
def test_prepare_subscription_event(self):
with open(os.path.join(os.path.dirname(__file__),
'data/pm_subscription_event.json'), 'r') as data:
expected_sub_event = json.load(data)
- app_conf = AppConfig(**self.cbs_data_1['config'])
- actual_sub_event = self.sub_1.prepare_subscription_event(self.nf_1.nf_name, app_conf)
+ actual_sub_event = self.app_conf.subscription.prepare_subscription_event('pnf_1',
+ self.app_conf)
self.assertEqual(expected_sub_event, actual_sub_event)
def test_get_nf_models(self):
- for nf in [self.nf_1, self.nf_2]:
- self.sub_1.add_network_function_to_subscription(nf)
- nf_models = self.sub_1._get_nf_models()
+ for nf in self.xnfs:
+ self.app_conf.subscription.add_network_function_to_subscription(nf)
+ nf_models = self.app_conf.subscription._get_nf_models()
- self.assertEqual(2, len(nf_models))
+ self.assertEqual(3, len(nf_models))
self.assertIsInstance(nf_models[0], NetworkFunctionModel)
def test_get_network_functions(self):
- for nf in [self.nf_1, self.nf_2]:
- self.sub_1.add_network_function_to_subscription(nf)
- nfs = self.sub_1.get_network_functions()
+ for nf in self.xnfs:
+ self.app_conf.subscription.add_network_function_to_subscription(nf)
+ nfs = self.app_conf.subscription.get_network_functions()
- self.assertEqual(2, len(nfs))
+ self.assertEqual(3, len(nfs))
self.assertIsInstance(nfs[0], NetworkFunction)
diff --git a/components/pm-subscription-handler/tests/test_subscription_handler.py b/components/pm-subscription-handler/tests/test_subscription_handler.py
index 3eb12bca..a277da66 100644
--- a/components/pm-subscription-handler/tests/test_subscription_handler.py
+++ b/components/pm-subscription-handler/tests/test_subscription_handler.py
@@ -21,82 +21,78 @@ from unittest import TestCase
from unittest.mock import patch
from mod.network_function import NetworkFunction
+from mod.pmsh_utils import AppConfig
from mod.subscription import AdministrativeState
from mod.subscription_handler import SubscriptionHandler
class SubscriptionHandlerTest(TestCase):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
@patch('mod.create_app')
- @patch('mod.subscription.Subscription')
@patch('mod.pmsh_utils._MrPub')
- @patch('mod.pmsh_utils.AppConfig')
@patch('mod.pmsh_utils.PeriodicTask')
- def setUp(self, mock_aai_event_thread, mock_app_conf, mock_mr_pub,
- mock_sub, mock_app):
+ def setUp(self, mock_aai_event_thread, mock_mr_pub, mock_app, mock_get_pmsh_config):
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
- self.cbs_data_1 = json.load(data)
+ self.cbs_data = json.load(data)
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.app_conf = AppConfig()
self.mock_app = mock_app
- self.mock_sub = mock_sub
self.mock_mr_pub = mock_mr_pub
- self.mock_app_conf = mock_app_conf
self.mock_aai_event_thread = mock_aai_event_thread
self.nf_1 = NetworkFunction(nf_name='pnf_1')
self.nf_2 = NetworkFunction(nf_name='pnf_2')
self.nfs = [self.nf_1, self.nf_2]
+ def tearDown(self):
+ pass
+
@patch('mod.logger.info')
- @patch('mod.aai_client.get_pmsh_subscription_data')
+ @patch('mod.aai_client.get_pmsh_nfs_from_aai')
def test_execute_no_change_of_state(self, mock_get_aai, mock_logger):
- mock_get_aai.return_value = self.mock_sub, self.nfs
+ mock_get_aai.return_value = self.nfs
sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
- self.mock_app, self.mock_app_conf,
+ self.mock_app, self.app_conf,
self.mock_aai_event_thread)
- with patch('mod.pmsh_utils.ConfigHandler.get_pmsh_config', return_value=self.cbs_data_1):
- sub_handler.execute()
+ sub_handler.execute()
mock_logger.assert_called_with('Administrative State did not change in the Config')
- @patch('mod.aai_client.get_pmsh_subscription_data')
- def test_execute_change_of_state_unlocked(self, mock_get_aai):
- mock_get_aai.return_value = self.mock_sub, self.nfs
+ @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.aai_client.get_pmsh_nfs_from_aai')
+ def test_execute_change_of_state_unlocked(self, mock_get_aai, mock_process_sub):
+ mock_get_aai.return_value = self.nfs
self.mock_aai_event_thread.return_value.start.return_value = 'start_method'
sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
- self.mock_app, self.mock_app_conf,
+ self.mock_app, self.app_conf,
self.mock_aai_event_thread.return_value)
- with patch('mod.pmsh_utils.ConfigHandler.get_pmsh_config', return_value=self.cbs_data_1):
- sub_handler.execute()
-
+ sub_handler.execute()
self.assertEqual(AdministrativeState.UNLOCKED.value, sub_handler.administrative_state)
- self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub,
- self.mock_app_conf)
+ mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
self.mock_aai_event_thread.return_value.start.assert_called()
- @patch('mod.aai_client.get_pmsh_subscription_data')
- def test_execute_change_of_state_locked(self, mock_get_aai):
- mock_get_aai.return_value = self.mock_sub, self.nfs
+ @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.aai_client.get_pmsh_nfs_from_aai')
+ def test_execute_change_of_state_locked(self, mock_get_aai, mock_process_sub):
+ mock_get_aai.return_value = self.nfs
self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method'
- self.cbs_data_1['policy']['subscription']['administrativeState'] = \
- AdministrativeState.LOCKED.value
+ self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
- self.mock_app, self.mock_app_conf,
+ self.mock_app, self.app_conf,
self.mock_aai_event_thread.return_value)
- with patch('mod.pmsh_utils.ConfigHandler.get_pmsh_config', return_value=self.cbs_data_1):
- sub_handler.execute()
-
+ sub_handler.execute()
self.assertEqual(AdministrativeState.LOCKED.value, sub_handler.administrative_state)
- self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub,
- self.mock_app_conf)
+ mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
self.mock_aai_event_thread.return_value.cancel.assert_called()
+ self.app_conf.subscription.administrativeState = AdministrativeState.UNLOCKED.value
+ @patch('mod.subscription.Subscription.process_subscription')
@patch('mod.logger.error')
- @patch('mod.aai_client.get_pmsh_subscription_data')
- def test_execute_exception(self, mock_get_aai, mock_logger):
- mock_get_aai.return_value = self.mock_sub, self.nfs
- self.mock_sub.process_subscription.side_effect = Exception
+ @patch('mod.aai_client.get_pmsh_nfs_from_aai')
+ def test_execute_exception(self, mock_get_aai, mock_logger, mock_process_sub):
+ mock_get_aai.return_value = self.nfs
+ mock_process_sub.side_effect = Exception
sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
- self.mock_app, self.mock_app_conf,
+ self.mock_app, self.app_conf,
self.mock_aai_event_thread)
- with patch('mod.pmsh_utils.ConfigHandler.get_pmsh_config', return_value=self.cbs_data_1):
- sub_handler.execute()
-
+ sub_handler.execute()
mock_logger.assert_called_with('Error occurred during the activation/deactivation process ')