summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/tests
diff options
context:
space:
mode:
authorERIMROB <robertas.rimkus@est.tech>2020-02-12 11:35:20 +0000
committerERIMROB <robertas.rimkus@est.tech>2020-02-20 16:03:48 +0000
commit26b76c02052269ea850d8d4efd6deb536115a0af (patch)
treef7485d7ccd0e7d95c000b9c05bce2c371c34581a /components/pm-subscription-handler/tests
parentd42ac06c733c43e19a01b4203c1b987b4973ccfd (diff)
Add Support for Activation and Deactivation
* Add support for reconfiguration of the administrativeState field * Add support for policy feedback handling * Fix network function filter applying to non active network functions Signed-off-by: ERIMROB <robertas.rimkus@est.tech> Change-Id: Ic1cfc3207b2495c1d8d10acd0ed1c40114cf4643 Issue-ID: DCAEGEN2-1830
Diffstat (limited to 'components/pm-subscription-handler/tests')
-rw-r--r--components/pm-subscription-handler/tests/data/aai_xnfs.json8
-rw-r--r--components/pm-subscription-handler/tests/test_aai_service.py14
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_config_handler.py4
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_network_function.py16
-rw-r--r--components/pm-subscription-handler/tests/test_pmsh_service.py82
-rw-r--r--components/pm-subscription-handler/tests/test_pmsh_utils.py123
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_subscription.py86
7 files changed, 293 insertions, 40 deletions
diff --git a/components/pm-subscription-handler/tests/data/aai_xnfs.json b/components/pm-subscription-handler/tests/data/aai_xnfs.json
index 78fc6548..d0c2043e 100644
--- a/components/pm-subscription-handler/tests/data/aai_xnfs.json
+++ b/components/pm-subscription-handler/tests/data/aai_xnfs.json
@@ -36,7 +36,8 @@
"serial-number": "6061ZW3",
"ipaddress-v6-oam": "2001:0db8:0:0:0:0:1428:57ab",
"resource-version": "1573053304574",
- "nf-role": "gNB"
+ "nf-role": "gNB",
+ "orchestration-status": "Active"
}
},
{
@@ -50,7 +51,7 @@
"equip-vendor": "Nokia",
"equip-model": "val6",
"ipaddress-v4-oam": "10.10.10.66",
- "orchestration-status": "Inventoried",
+ "orchestration-status": "Active",
"sw-version": "val7",
"in-maint": false,
"serial-number": "6061ZW3",
@@ -70,6 +71,7 @@
"equip-vendor": "Nokia",
"equip-model": "val6",
"ipaddress-v4-oam": "10.10.13.26",
+ "orchestration-status": "Active",
"sw-version": "val7",
"in-maint": false,
"serial-number": "6061ZW3",
@@ -89,7 +91,7 @@
"equip-vendor": "Ericsson",
"equip-model": "val6",
"ipaddress-v4-oam": "10.40.10.26",
- "orchestration-status": "Inventoried",
+ "orchestration-status": "Inventoried",
"sw-version": "val7",
"in-maint": false,
"serial-number": "6061ZW3",
diff --git a/components/pm-subscription-handler/tests/test_aai_service.py b/components/pm-subscription-handler/tests/test_aai_service.py
index 7f4735a9..aaf2bb14 100644
--- a/components/pm-subscription-handler/tests/test_aai_service.py
+++ b/components/pm-subscription-handler/tests/test_aai_service.py
@@ -17,9 +17,9 @@
# ============LICENSE_END=====================================================
import json
import os
-import unittest
from test.support import EnvironmentVarGuard
-from unittest import mock
+from unittest import mock, TestCase
+from unittest.mock import patch
import responses
from requests import Session
@@ -27,7 +27,7 @@ from requests import Session
import mod.aai_client as aai_client
-class AaiClientTestCase(unittest.TestCase):
+class AaiClientTestCase(TestCase):
def setUp(self):
self.env = EnvironmentVarGuard()
@@ -38,16 +38,16 @@ class AaiClientTestCase(unittest.TestCase):
with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
self.aai_response_data = data.read()
- @mock.patch.object(Session, 'put')
+ @patch.object(Session, 'put')
def test_aai_client_get_pm_sub_data_success(self, mock_session):
mock_session.return_value.status_code = 200
mock_session.return_value.text = self.aai_response_data
sub, xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data)
self.assertEqual(sub.subscriptionName, 'ExtraPM-All-gNB-R2B')
self.assertEqual(sub.administrativeState, 'UNLOCKED')
- self.assertEqual(len(xnfs), 6)
+ self.assertEqual(len(xnfs), 3)
- @mock.patch.object(Session, 'put')
+ @patch.object(Session, 'put')
def test_aai_client_get_pm_sub_data_fail(self, mock_session):
mock_session.return_value.status_code = 404
with mock.patch('mod.aai_client._get_all_aai_nf_data', return_value=None):
@@ -73,5 +73,5 @@ class AaiClientTestCase(unittest.TestCase):
with self.assertRaises(KeyError):
aai_client._get_aai_service_url()
- def test_aai_client_get_aai_service_url_succses(self):
+ def test_aai_client_get_aai_service_url_success(self):
self.assertEqual('https://1.2.3.4:8443', aai_client._get_aai_service_url())
diff --git a/components/pm-subscription-handler/tests/test_config_handler.py b/components/pm-subscription-handler/tests/test_config_handler.py
index 5e80db5d..dce48fca 100755
--- a/components/pm-subscription-handler/tests/test_config_handler.py
+++ b/components/pm-subscription-handler/tests/test_config_handler.py
@@ -24,7 +24,7 @@ from os import path
import responses
from tenacity import wait_none
-from pmsh_service.mod.config_handler import ConfigHandler
+from mod.config_handler import ConfigHandler
class ConfigHandlerTestCase(unittest.TestCase):
@@ -49,7 +49,7 @@ class ConfigHandlerTestCase(unittest.TestCase):
@responses.activate
def test_get_config_success(self):
- responses.add(responses.GET, self.cbs_url, json=json.dumps(self.expected_config),
+ responses.add(responses.GET, self.cbs_url, json=self.expected_config,
status=200)
config_handler = ConfigHandler()
diff --git a/components/pm-subscription-handler/tests/test_network_function.py b/components/pm-subscription-handler/tests/test_network_function.py
index 267851d2..e9394b46 100755
--- a/components/pm-subscription-handler/tests/test_network_function.py
+++ b/components/pm-subscription-handler/tests/test_network_function.py
@@ -15,17 +15,17 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-import unittest
from test.support import EnvironmentVarGuard
-from unittest import mock
+from unittest import TestCase
+from unittest.mock import patch
from mod import db, create_app
from mod.network_function import NetworkFunction
-class NetworkFunctionTests(unittest.TestCase):
+class NetworkFunctionTests(TestCase):
- @mock.patch('mod.get_db_connection_url')
+ @patch('mod.get_db_connection_url')
def setUp(self, mock_get_db_url):
mock_get_db_url.return_value = 'sqlite://'
self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
@@ -65,3 +65,11 @@ class NetworkFunctionTests(unittest.TestCase):
same_nf = self.nf_1.create()
self.assertEqual(nf, same_nf)
+
+ def test_delete_network_function(self):
+ self.nf_1.create()
+ self.nf_2.create()
+ self.nf_1.delete(nf_name='pnf_1')
+ nfs = NetworkFunction.get_all()
+
+ self.assertEqual(1, len(nfs))
diff --git a/components/pm-subscription-handler/tests/test_pmsh_service.py b/components/pm-subscription-handler/tests/test_pmsh_service.py
new file mode 100644
index 00000000..4a6032b5
--- /dev/null
+++ b/components/pm-subscription-handler/tests/test_pmsh_service.py
@@ -0,0 +1,82 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2020 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+import os
+import json
+from unittest import TestCase
+from unittest.mock import patch
+
+import pmsh_service_main as pmsh_service
+from mod.network_function import NetworkFunction
+
+
+class PMSHServiceTest(TestCase):
+
+ @patch('mod.create_app')
+ @patch('mod.subscription.Subscription')
+ @patch('mod.pmsh_utils._MrPub')
+ @patch('mod.config_handler.ConfigHandler')
+ def setUp(self, mock_config_handler, mock_mr_pub,
+ mock_sub, mock_app):
+ with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
+ self.cbs_data_1 = json.load(data)
+ self.mock_app = mock_app
+ self.mock_sub = mock_sub
+ self.mock_mr_pub = mock_mr_pub
+ self.mock_config_handler = mock_config_handler
+ self.nf_1 = NetworkFunction(nf_name='pnf_1')
+ self.nf_2 = NetworkFunction(nf_name='pnf_2')
+ self.nfs = [self.nf_1, self.nf_2]
+
+ @patch('threading.Timer')
+ @patch('mod.aai_client.get_pmsh_subscription_data')
+ def test_subscription_processor_changed_state(self, mock_get_aai, mock_thread):
+ self.mock_config_handler.get_config.return_value = self.cbs_data_1
+ mock_get_aai.return_value = self.mock_sub, self.nfs
+ mock_thread.start.return_value = 1
+
+ pmsh_service.subscription_processor(self.mock_config_handler, 'LOCKED',
+ self.mock_mr_pub, self.mock_app)
+
+ self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub)
+
+ @patch('threading.Timer')
+ @patch('mod.pmsh_logging.debug')
+ @patch('mod.aai_client.get_pmsh_subscription_data')
+ def test_subscription_processor_unchanged_state(self, mock_get_aai, mock_logger, mock_thread):
+ self.mock_config_handler.get_config.return_value = self.cbs_data_1
+ mock_get_aai.return_value = self.mock_sub, self.nfs
+ mock_thread.start.return_value = 1
+
+ pmsh_service.subscription_processor(self.mock_config_handler, 'UNLOCKED', self.mock_mr_pub,
+ self.mock_app)
+
+ mock_logger.assert_called_with('Administrative State did not change in the Config')
+
+ @patch('threading.Timer')
+ @patch('mod.pmsh_logging.debug')
+ @patch('mod.aai_client.get_pmsh_subscription_data')
+ def test_subscription_processor_exception(self, mock_get_aai, mock_logger, mock_thread):
+ self.mock_config_handler.get_config.return_value = self.cbs_data_1
+ mock_get_aai.return_value = self.mock_sub, self.nfs
+ mock_thread.start.return_value = 1
+ self.mock_sub.process_subscription.side_effect = Exception
+
+ pmsh_service.subscription_processor(self.mock_config_handler, 'LOCKED', self.mock_mr_pub,
+ self.mock_app)
+ mock_logger.assert_called_with(f'Error occurred during the '
+ f'activation/deactivation process ')
diff --git a/components/pm-subscription-handler/tests/test_pmsh_utils.py b/components/pm-subscription-handler/tests/test_pmsh_utils.py
index 8df2c62a..03e8c691 100644
--- a/components/pm-subscription-handler/tests/test_pmsh_utils.py
+++ b/components/pm-subscription-handler/tests/test_pmsh_utils.py
@@ -17,26 +17,39 @@
# ============LICENSE_END=====================================================
import json
import os
-import unittest
from test.support import EnvironmentVarGuard
-from unittest import mock
+from unittest import TestCase
from unittest.mock import patch
import responses
from requests import Session
+from tenacity import stop_after_attempt
-from mod import get_db_connection_url
-from mod.pmsh_utils import AppConfig
+from mod import db, get_db_connection_url, create_app
+from mod.db_models import SubscriptionModel
+from mod.pmsh_utils import AppConfig, policy_response_handle_functions
from mod.subscription import Subscription
+from mod.network_function import NetworkFunction
-class PmshUtilsTestCase(unittest.TestCase):
+class PmshUtilsTestCase(TestCase):
- def setUp(self):
+ @patch('mod.create_app')
+ @patch('mod.get_db_connection_url')
+ def setUp(self, mock_get_db_url, mock_app):
+ mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
self.app_conf = AppConfig(**self.cbs_data['config'])
self.sub = Subscription(**self.cbs_data['policy']['subscription'])
+ self.env = EnvironmentVarGuard()
+ self.env.set('LOGS_PATH', './unit_test_logs')
+ self.policy_mr_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
+ self.mock_app = mock_app
+ self.app = create_app()
+ self.app_context = self.app.app_context()
+ self.app_context.push()
+ db.create_all()
def test_utils_get_mr_sub(self):
mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
@@ -58,7 +71,7 @@ class PmshUtilsTestCase(unittest.TestCase):
self.assertTrue(self.app_conf.cert_params, ('/opt/app/pm-mapper/etc/certs/cert.pem',
'/opt/app/pm-mapper/etc/certs/key.pem'))
- @mock.patch.object(Session, 'post')
+ @patch.object(Session, 'post')
def test_mr_pub_publish_to_topic_success(self, mock_session):
mock_session.return_value.status_code = 200
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
@@ -87,8 +100,7 @@ class PmshUtilsTestCase(unittest.TestCase):
'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
'dcae_pmsh_cg/1?timeout=1000',
json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200)
- mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
- mr_topic_data = mr_policy_sub.get_from_topic(1)
+ mr_topic_data = self.policy_mr_sub.get_from_topic(1)
self.assertIsNotNone(mr_topic_data)
@responses.activate
@@ -97,8 +109,7 @@ class PmshUtilsTestCase(unittest.TestCase):
'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
'dcae_pmsh_cg/1?timeout=1000',
json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400)
- mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
- mr_topic_data = mr_policy_sub.get_from_topic(1)
+ mr_topic_data = self.policy_mr_sub.get_from_topic(1)
self.assertIsNone(mr_topic_data)
def test_get_db_connection_url_success(self):
@@ -115,3 +126,93 @@ class PmshUtilsTestCase(unittest.TestCase):
self.env.set('PMSH_PG_PASSWORD', 'pass')
with self.assertRaises(Exception):
get_db_connection_url()
+
+ @patch('mod.pmsh_utils.NetworkFunction.delete')
+ def test_handle_response_locked_success(self, mock_delete):
+ with patch.dict(policy_response_handle_functions, {'LOCKED': {'success': mock_delete}}):
+ administrative_state = 'LOCKED'
+ nf = NetworkFunction(nf_name='nf1')
+ self.policy_mr_sub._handle_response(self.sub.subscriptionName, administrative_state,
+ nf.nf_name, 'success')
+
+ mock_delete.assert_called()
+
+ @patch('mod.subscription.Subscription.update_sub_nf_status')
+ def test_handle_response_locked_failed(self, mock_update_sub_nf):
+ with patch.dict(policy_response_handle_functions,
+ {'LOCKED': {'failed': mock_update_sub_nf}}):
+ administrative_state = 'LOCKED'
+ nf = NetworkFunction(nf_name='nf1')
+ self.policy_mr_sub._handle_response(self.sub.subscriptionName, administrative_state,
+ nf.nf_name, 'failed')
+ mock_update_sub_nf.assert_called()
+
+ @patch('mod.subscription.Subscription.update_sub_nf_status')
+ def test_handle_response_unlocked_success(self, mock_update_sub_nf):
+ with patch.dict(policy_response_handle_functions,
+ {'UNLOCKED': {'success': mock_update_sub_nf}}):
+ nf = NetworkFunction(nf_name='nf1')
+ self.policy_mr_sub._handle_response(self.sub.subscriptionName,
+ self.sub.administrativeState,
+ nf.nf_name, 'success')
+ mock_update_sub_nf.assert_called()
+
+ @patch('mod.subscription.Subscription.update_sub_nf_status')
+ def test_handle_response_unlocked_failed(self, mock_update_sub_nf):
+ with patch.dict(policy_response_handle_functions,
+ {'UNLOCKED': {'failed': mock_update_sub_nf}}):
+ nf = NetworkFunction(nf_name='nf1')
+ self.policy_mr_sub._handle_response(self.sub.subscriptionName,
+ self.sub.administrativeState,
+ nf.nf_name, 'failed')
+ mock_update_sub_nf.assert_called()
+
+ def test_handle_response_exception(self):
+ self.assertRaises(Exception, self.policy_mr_sub._handle_response, 'sub1', 'wrong_state',
+ 'nf1', 'wrong_message')
+
+ @patch('mod.pmsh_utils._MrSub.get_from_topic')
+ @patch('mod.pmsh_utils._MrSub._handle_response')
+ @patch('mod.subscription.Subscription.get')
+ @patch('threading.Timer')
+ def test_poll_policy_topic_calls_methods_correct_sub(self, mock_thread, mock_get_sub,
+ mock_handle_response, mock_get_from_topic):
+ result_data = ['{"name": "ResponseEvent","status": { "subscriptionName": '
+ '"ExtraPM-All-gNB-R2B", "nfName": "pnf300", "message": "success" } }']
+ mock_get_from_topic.return_value = result_data
+ mock_thread.start.return_value = 1
+ mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
+ status='UNLOCKED')
+ self.policy_mr_sub.poll_policy_topic(self.sub.subscriptionName, self.mock_app)
+
+ mock_get_from_topic.assert_called()
+ mock_handle_response.assert_called_with(self.sub.subscriptionName,
+ 'UNLOCKED', 'pnf300', 'success')
+
+ @patch('mod.pmsh_utils._MrSub.get_from_topic')
+ @patch('mod.pmsh_utils._MrSub._handle_response')
+ @patch('mod.subscription.Subscription.get')
+ @patch('threading.Timer')
+ def test_poll_policy_topic_no_method_calls_incorrect_sub(self, mock_thread, mock_get_sub,
+ mock_handle_response,
+ mock_get_from_topic):
+ result_data = ['{"name": "ResponseEvent","status": { "subscriptionName": '
+ '"demo-subscription", "nfName": "pnf300", "message": "success" } }']
+ mock_get_from_topic.return_value = result_data
+ mock_thread.start.return_value = 1
+ mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
+ status='UNLOCKED')
+ self.policy_mr_sub.poll_policy_topic(self.sub, self.mock_app)
+
+ mock_get_from_topic.assert_called()
+ mock_handle_response.assert_not_called()
+
+ @patch('mod.subscription.Subscription.get')
+ @patch('mod.pmsh_utils._MrSub.get_from_topic')
+ def test_poll_policy_topic_exception(self, mock_get_from_topic, mock_get_sub):
+ mock_get_from_topic.return_value = 'wrong_return'
+ mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
+ status='UNLOCKED')
+ self.policy_mr_sub.poll_policy_topic.retry.stop = stop_after_attempt(1)
+
+ self.assertRaises(Exception, self.policy_mr_sub.poll_policy_topic, 'sub1', self.mock_app)
diff --git a/components/pm-subscription-handler/tests/test_subscription.py b/components/pm-subscription-handler/tests/test_subscription.py
index 97c1d6a1..8fe233e4 100755
--- a/components/pm-subscription-handler/tests/test_subscription.py
+++ b/components/pm-subscription-handler/tests/test_subscription.py
@@ -17,11 +17,12 @@
# ============LICENSE_END=====================================================
import json
import os
-import unittest
from test.support import EnvironmentVarGuard
-from unittest import mock
+from unittest import TestCase
+from unittest.mock import patch
from requests import Session
+from tenacity import stop_after_attempt
import mod.aai_client as aai_client
from mod import db, create_app
@@ -29,11 +30,12 @@ from mod.network_function import NetworkFunction
from mod.subscription import Subscription, NetworkFunctionFilter
-class SubscriptionTest(unittest.TestCase):
-
- @mock.patch('mod.get_db_connection_url')
- @mock.patch.object(Session, 'put')
- def setUp(self, mock_session, mock_get_db_url):
+class SubscriptionTest(TestCase):
+ @patch('mod.pmsh_utils._MrPub')
+ @patch('mod.pmsh_utils._MrSub')
+ @patch('mod.get_db_connection_url')
+ @patch.object(Session, 'put')
+ def setUp(self, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub):
mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
self.aai_response_data = data.read()
@@ -54,6 +56,8 @@ class SubscriptionTest(unittest.TestCase):
self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
self.xnf_filter = NetworkFunctionFilter(**self.sub_1.nfFilter)
+ self.mock_mr_sub = mock_mr_sub
+ self.mock_mr_pub = mock_mr_pub
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
@@ -100,12 +104,6 @@ class SubscriptionTest(unittest.TestCase):
self.assertEqual(sub1, same_sub1)
self.assertEqual(1, len(self.sub_1.get_all()))
- def test_get_nfs_per_subscription(self):
- nf_array = [self.nf_1, self.nf_2]
- self.sub_1.add_network_functions_to_subscription(nf_array)
- nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
- self.assertEqual(2, len(nfs_for_sub_1))
-
def test_add_network_functions_per_subscription(self):
nf_array = [self.nf_1, self.nf_2]
self.sub_1.add_network_functions_to_subscription(nf_array)
@@ -125,3 +123,65 @@ class SubscriptionTest(unittest.TestCase):
self.sub_1.add_network_functions_to_subscription(nf_array)
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
+
+ def test_update_subscription_status(self):
+ sub_name = 'ExtraPM-All-gNB-R2B'
+ self.sub_1.create()
+ self.sub_1.administrativeState = 'new_status'
+ self.sub_1.update_subscription_status()
+ sub = Subscription.get(sub_name)
+
+ self.assertEqual('new_status', sub.status)
+
+ def test_delete_subscription(self):
+ self.sub_1.create()
+ subs = self.sub_1.get_all()
+ self.assertEqual(1, len(subs))
+
+ self.sub_1.delete_subscription()
+ new_subs = self.sub_1.get_all()
+ self.assertEqual(0, len(new_subs))
+
+ def test_update_sub_nf_status(self):
+ sub_name = 'ExtraPM-All-gNB-R2B'
+ nf_array = [self.nf_1, self.nf_2]
+ self.sub_1.add_network_functions_to_subscription(nf_array)
+ sub_nfs = Subscription.get_all_nfs_subscription_relations()
+ self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
+
+ Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_1')
+ sub_nfs = Subscription.get_all_nfs_subscription_relations()
+ self.assertEqual('Active', sub_nfs[0].nf_sub_status)
+ self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
+
+ @patch('mod.subscription.Subscription.add_network_functions_to_subscription')
+ @patch('mod.subscription.Subscription.update_sub_nf_status')
+ @patch('mod.subscription.Subscription.update_subscription_status')
+ def test_process_activate_subscription(self, mock_update_sub_status,
+ mock_update_sub_nf, mock_add_nfs):
+ self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
+ self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
+
+ mock_update_sub_status.assert_called()
+ mock_add_nfs.assert_called()
+ self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
+ mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
+ 'PENDING_CREATE', self.nf_1.nf_name)
+
+ @patch('mod.subscription.Subscription.update_sub_nf_status')
+ @patch('mod.subscription.Subscription.update_subscription_status')
+ def test_process_deactivate_subscription(self, mock_update_sub_status,
+ mock_update_sub_nf):
+ self.sub_1.administrativeState = 'LOCKED'
+ self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
+ self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
+
+ self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
+ mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
+ 'PENDING_DELETE', self.nf_1.nf_name)
+ mock_update_sub_status.assert_called()
+
+ def test_process_subscription_exception(self):
+ self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
+ self.assertRaises(Exception, self.sub_1.process_subscription,
+ [self.nf_1], 'not_mr_pub')