summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/tests/test_aai_event_handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/tests/test_aai_event_handler.py')
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_aai_event_handler.py33
1 files changed, 25 insertions, 8 deletions
diff --git a/components/pm-subscription-handler/tests/test_aai_event_handler.py b/components/pm-subscription-handler/tests/test_aai_event_handler.py
index d366dac5..9ac76477 100755
--- a/components/pm-subscription-handler/tests/test_aai_event_handler.py
+++ b/components/pm-subscription-handler/tests/test_aai_event_handler.py
@@ -16,42 +16,59 @@
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
import json
+import os
from os import path
+from test.support import EnvironmentVarGuard
from unittest import TestCase
from unittest.mock import patch, Mock
+from mod import create_app, db
from mod.aai_event_handler import process_aai_events
-from mod.network_function import NetworkFunction, OrchestrationStatus
+from mod.network_function import NetworkFunction
from mod.pmsh_utils import AppConfig
class AAIEventHandlerTest(TestCase):
+ @patch('mod.get_db_connection_url')
+ @patch('mod.update_logging_config')
@patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
- def setUp(self, mock_get_pmsh_config):
+ def setUp(self, mock_get_pmsh_config, mock_update_config, mock_get_db_url):
+ mock_get_db_url.return_value = 'sqlite://'
with open(path.join(path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
self.app_conf = AppConfig()
with open(path.join(path.dirname(__file__), 'data/mr_aai_events.json'), 'r') as data:
self.mr_aai_events = json.load(data)["mr_response"]
+ self.env = EnvironmentVarGuard()
+ self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
self.mock_mr_sub = Mock(get_from_topic=Mock(return_value=self.mr_aai_events))
self.mock_mr_pub = Mock()
self.mock_app = Mock()
+ self.app = create_app()
+ self.app_context = self.app.app_context()
+ self.app_context.push()
+ db.create_all()
- @patch('mod.subscription.Subscription.process_subscription')
+ def tearDown(self):
+ db.session.remove()
+ db.drop_all()
+ self.app_context.pop()
+
+ @patch('mod.subscription.Subscription.activate_subscription')
@patch('mod.aai_event_handler.NetworkFunction.delete')
@patch('mod.aai_event_handler.NetworkFunction.get')
def test_process_aai_update_and_delete_events(self, mock_nf_get, mock_nf_delete,
- mock_process_sub):
+ mock_activate_sub):
pnf_already_active = NetworkFunction(nf_name='pnf_already_active',
- orchestration_status=OrchestrationStatus.ACTIVE.value)
+ orchestration_status='Active')
mock_nf_get.side_effect = [None, pnf_already_active]
expected_nf_for_processing = NetworkFunction(
- nf_name='pnf_newly_discovered', orchestration_status=OrchestrationStatus.ACTIVE.value)
+ nf_name='pnf_newly_discovered', orchestration_status='Active')
process_aai_events(self.mock_mr_sub, self.mock_mr_pub, self.mock_app, self.app_conf)
- mock_process_sub.assert_called_once_with([expected_nf_for_processing],
- self.mock_mr_pub, self.app_conf)
+ mock_activate_sub.assert_called_once_with([expected_nf_for_processing],
+ self.mock_mr_pub, self.app_conf)
mock_nf_delete.assert_called_once_with(nf_name='pnf_to_be_deleted')