From f7be006e7cc638788164fb1028d03898138b8c16 Mon Sep 17 00:00:00 2001 From: efiacor Date: Wed, 13 Jan 2021 16:12:57 +0000 Subject: [PMSH] Update sub object on activate Signed-off-by: efiacor Change-Id: Id9418301e0cb4d373339b9b3e3476f7db5770f3e Issue-ID: DCAEGEN2-2152 --- components/pm-subscription-handler/Changelog.md | 1 + .../pmsh_service/mod/exit_handler.py | 7 +- .../pmsh_service/mod/network_function.py | 8 +-- .../pmsh_service/mod/pmsh_utils.py | 27 ++++---- .../pmsh_service/mod/subscription.py | 6 ++ .../pmsh_service/mod/subscription_handler.py | 80 +++++++++++++--------- .../pmsh_service/pmsh_service_main.py | 15 +--- .../tests/test_pmsh_utils.py | 7 +- .../tests/test_subscription_handler.py | 43 ++++++------ 9 files changed, 100 insertions(+), 94 deletions(-) (limited to 'components/pm-subscription-handler') diff --git a/components/pm-subscription-handler/Changelog.md b/components/pm-subscription-handler/Changelog.md index d0fb4c0c..8988508c 100755 --- a/components/pm-subscription-handler/Changelog.md +++ b/components/pm-subscription-handler/Changelog.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). * Bug fix prevent sub threads from crashing permanently (DCAEGEN2-2501) * Added Resource Name (model-name) to filter (DCAEGEN2-2402) * Added retry mechanism for DELETE_FAILED subscriptions on given NFs (DCAEGEN2-2152) +* Added func to update the subscription object on ACTIVATE/UNLOCK (DCAEGEN2-2152) ## [1.1.2] ### Changed diff --git a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py index aed86307..1ea83e59 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2020 Nordix Foundation. +# Copyright (C) 2020-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -39,10 +39,7 @@ class ExitHandler: def __call__(self, sig_num, frame): logger.info('Graceful shutdown of PMSH initiated.') logger.debug(f'ExitHandler was called with signal number: {sig_num}.') - for thread in self.periodic_tasks: - if thread.name == 'aai_event_thread': - logger.info(f'Cancelling thread {thread.name}') - thread.cancel() + self.subscription_handler.stop_aai_event_thread() current_sub = self.app_conf.subscription if current_sub.administrativeState == AdministrativeState.UNLOCKED.value: try: diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py index 02656356..83130a8e 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py +++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2020 Nordix Foundation. +# Copyright (C) 2020-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ import re -import mod.aai_client from mod import logger, db from mod.api.db_models import NetworkFunctionModel @@ -89,8 +88,9 @@ class NetworkFunction: def set_nf_model_params(self, app_conf): params_set = True try: - sdnc_model_data = mod.aai_client.get_aai_model_data(app_conf, self.model_invariant_id, - self.model_version_id, self.nf_name) + from mod.aai_client import get_aai_model_data + sdnc_model_data = get_aai_model_data(app_conf, self.model_invariant_id, + self.model_version_id, self.nf_name) try: self.sdnc_model_name = sdnc_model_data['sdnc-model-name'] diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index c6be38d0..18834134 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019-2020 Nordix Foundation. +# Copyright (C) 2019-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -62,10 +62,7 @@ class AppConfig: INSTANCE = None def __init__(self): - try: - conf = self._get_pmsh_config() - except Exception: - raise + conf = self._get_pmsh_config() self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'), 'aaf_pass': conf['config'].get('aaf_password')} self.enable_tls = conf['config'].get('enable_tls') @@ -85,7 +82,8 @@ class AppConfig: return AppConfig.INSTANCE @mdc_handler - @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) + @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), + retry=retry_if_exception_type(ValueError)) def _get_pmsh_config(self, **kwargs): """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response is received, it retries after 2 seconds for 5 times before raising an exception. @@ -97,13 +95,13 @@ class AppConfig: Exception: If any error occurred pulling configuration from Config binding service. """ try: - logger.info('Fetching PMSH Configuration from CBS.') + logger.info('Attempting to fetch PMSH Configuration from CBS.') config = get_all() logger.info(f'Successfully fetched PMSH config from CBS: {config}') return config - except Exception as err: - logger.error(f'Failed to get config from CBS: {err}', exc_info=True) - raise Exception + except Exception as e: + logger.error(f'Failed to get config from CBS: {e}', exc_info=True) + raise ValueError(e) def refresh_config(self): """ @@ -114,11 +112,11 @@ class AppConfig: """ try: app_conf = self._get_pmsh_config() - self.subscription.administrativeState = app_conf['policy']['subscription'][ - 'administrativeState'] + self.subscription = Subscription(**app_conf['policy']['subscription']) logger.info("AppConfig data has been refreshed") - except ValueError or Exception as e: - logger.error(f'Failed to refresh AppConfig: {e}', exc_info=True) + except Exception: + logger.error('Failed to refresh PMSH AppConfig') + raise def get_mr_sub(self, sub_name): """ @@ -235,6 +233,7 @@ class _MrPub(_DmaapMrClient): self.publish_to_topic(subscription_event) except Exception as e: logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True) + raise e class _MrSub(_DmaapMrClient): diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index 8443c9de..fdc1394c 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -71,6 +71,12 @@ class Subscription: self.measurementGroups = kwargs.get('measurementGroups') self.create() + def update_sub_params(self, admin_state, file_based_gp, file_location, meas_groups): + self.administrativeState = admin_state + self.fileBasedGP = file_based_gp + self.fileLocation = file_location + self.measurementGroups = meas_groups + def create(self): """ Creates a subscription database entry diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index a273a446..f50f5ab2 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2020 Nordix Foundation. +# Copyright (C) 2020-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,17 +16,19 @@ # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== -import mod.aai_client as aai -from mod import logger +from mod import logger, aai_client +from mod.aai_event_handler import process_aai_events +from mod.pmsh_utils import PeriodicTask from mod.subscription import AdministrativeState class SubscriptionHandler: - def __init__(self, mr_pub, app, app_conf, aai_event_thread): + def __init__(self, mr_pub, aai_sub, app, app_conf): self.mr_pub = mr_pub + self.aai_sub = aai_sub self.app = app self.app_conf = app_conf - self.aai_event_thread = aai_event_thread + self.aai_event_thread = None def execute(self): """ @@ -34,53 +36,69 @@ class SubscriptionHandler: the Subscription if a change has occurred """ self.app.app_context().push() - self.app_conf.refresh_config() - local_admin_state = self.app_conf.subscription.get_local_sub_admin_state() - new_administrative_state = self.app_conf.subscription.administrativeState try: - if local_admin_state == new_administrative_state: - logger.info(f'Administrative State did not change in the app config: ' - f'{new_administrative_state}') + local_admin_state = self.app_conf.subscription.get_local_sub_admin_state() + if local_admin_state == AdministrativeState.LOCKING.value: + self._check_for_failed_nfs() else: - self._check_state_change(local_admin_state, new_administrative_state) + self.app_conf.refresh_config() + new_administrative_state = self.app_conf.subscription.administrativeState + if local_admin_state == new_administrative_state: + logger.info(f'Administrative State did not change in the app config: ' + f'{new_administrative_state}') + else: + self._check_state_change(local_admin_state, new_administrative_state) except Exception as err: logger.error(f'Error occurred during the activation/deactivation process {err}', exc_info=True) def _check_state_change(self, local_admin_state, new_administrative_state): - if local_admin_state == AdministrativeState.LOCKING.value: - self._check_for_failed_nfs() + if new_administrative_state == AdministrativeState.UNLOCKED.value: + logger.info(f'Administrative State has changed from {local_admin_state} ' + f'to {new_administrative_state}.') + self._activate(new_administrative_state) + elif new_administrative_state == AdministrativeState.LOCKED.value: + logger.info(f'Administrative State has changed from {local_admin_state} ' + f'to {new_administrative_state}.') + self._deactivate() else: - if new_administrative_state == AdministrativeState.UNLOCKED.value: - logger.info(f'Administrative State has changed from {local_admin_state} ' - f'to {new_administrative_state}.') - self._activate() - elif new_administrative_state == AdministrativeState.LOCKED.value: - logger.info(f'Administrative State has changed from {local_admin_state} ' - f'to {new_administrative_state}.') - self._deactivate() - else: - logger.error(f'Invalid AdministrativeState: {new_administrative_state}') + raise Exception(f'Invalid AdministrativeState: {new_administrative_state}') - def _activate(self): - nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf) + def _activate(self, new_administrative_state): + self._start_aai_event_thread() + self.app_conf.subscription.update_sub_params(new_administrative_state, + self.app_conf.subscription.fileBasedGP, + self.app_conf.subscription.fileLocation, + self.app_conf.subscription.measurementGroups) + nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf) self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub, self.app_conf) self.app_conf.subscription.update_subscription_status() - if not self.aai_event_thread.is_alive(): - logger.info('Start polling for NF info on AAI-EVENT topic on DMaaP MR.') - self.aai_event_thread.start() def _deactivate(self): nfs = self.app_conf.subscription.get_network_functions() if nfs: - self.aai_event_thread.cancel() - logger.info('Stop listening for NFs events on AAI-EVENT topic in MR.') + self.stop_aai_event_thread() self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value logger.info('Subscription is now LOCKING/DEACTIVATING.') self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub, self.app_conf) self.app_conf.subscription.update_subscription_status() + def _start_aai_event_thread(self): + logger.info('Starting polling for NF info on AAI-EVENT topic on DMaaP MR.') + self.aai_event_thread = PeriodicTask(20, process_aai_events, args=(self.aai_sub, + self.mr_pub, + self.app, + self.app_conf)) + self.aai_event_thread.name = 'aai_event_thread' + self.aai_event_thread.start() + + def stop_aai_event_thread(self): + if self.aai_event_thread is not None: + self.aai_event_thread.cancel() + self.aai_event_thread = None + logger.info('Stopping polling for NFs events on AAI-EVENT topic in MR.') + def _check_for_failed_nfs(self): logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.') del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs() diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index 307235db..4f2ca4a1 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019-2020 Nordix Foundation. +# Copyright (C) 2019-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import sys from signal import signal, SIGTERM from mod import db, create_app, launch_api_server, logger -from mod.aai_event_handler import process_aai_events from mod.exit_handler import ExitHandler from mod.pmsh_utils import AppConfig, PeriodicTask from mod.policy_response_handler import PolicyResponseHandler @@ -46,20 +45,12 @@ def main(): logger.info('Start polling PMSH_CL_INPUT topic on DMaaP MR.') policy_response_handler_thread.start() - aai_event_thread = PeriodicTask(20, process_aai_events, - args=(aai_event_mr_sub, policy_mr_pub, app, app_conf)) - aai_event_thread.name = 'aai_event_thread' - logger.info('Start polling for NF info on AAI-EVENT topic on DMaaP MR.') - aai_event_thread.start() - - subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread) - + subscription_handler = SubscriptionHandler(policy_mr_pub, aai_event_mr_sub, app, app_conf) subscription_handler_thread = PeriodicTask(20, subscription_handler.execute) subscription_handler_thread.name = 'sub_handler_thread' subscription_handler_thread.start() - periodic_tasks = [aai_event_thread, subscription_handler_thread, - policy_response_handler_thread] + periodic_tasks = [subscription_handler_thread, policy_response_handler_thread] signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks, app_conf=app_conf, subscription_handler=subscription_handler)) diff --git a/components/pm-subscription-handler/tests/test_pmsh_utils.py b/components/pm-subscription-handler/tests/test_pmsh_utils.py index 1bc039d2..602253b8 100644 --- a/components/pm-subscription-handler/tests/test_pmsh_utils.py +++ b/components/pm-subscription-handler/tests/test_pmsh_utils.py @@ -24,7 +24,6 @@ from tenacity import RetryError from mod import get_db_connection_url from mod.network_function import NetworkFunction -from mod.pmsh_utils import AppConfig from tests.base_setup import BaseClassSetup from tests.base_setup import get_pmsh_config @@ -139,9 +138,7 @@ class PmshUtilsTestCase(BaseClassSetup): @patch('mod.logger.error') @patch('mod.pmsh_utils.get_all') def test_refresh_config_fail(self, mock_cbs_client_get_all, mock_logger): - mock_cbs_client_get_all.return_value = get_pmsh_config() - self.app_conf = AppConfig() - mock_cbs_client_get_all.side_effect = Exception + mock_cbs_client_get_all.side_effect = ValueError with self.assertRaises(RetryError): self.app_conf.refresh_config() - mock_logger.assert_called_with('Failed to get config from CBS: ', exc_info=True) + mock_logger.assert_called_with('Failed to refresh PMSH AppConfig') diff --git a/components/pm-subscription-handler/tests/test_subscription_handler.py b/components/pm-subscription-handler/tests/test_subscription_handler.py index f77dfd10..31dd0943 100644 --- a/components/pm-subscription-handler/tests/test_subscription_handler.py +++ b/components/pm-subscription-handler/tests/test_subscription_handler.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2020 Nordix Foundation. +# Copyright (C) 2020-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -33,11 +33,12 @@ class SubscriptionHandlerTest(BaseClassSetup): def setUpClass(cls): super().setUpClass() + @patch('mod.pmsh_utils._MrSub') @patch('mod.pmsh_utils._MrPub') - def setUp(self, mock_mr_pub): + def setUp(self, mock_mr_pub, mock_mr_sub): super().setUp() self.mock_mr_pub = mock_mr_pub - self.mock_aai_event_thread = Mock() + self.mock_mr_sub = mock_mr_sub self.mock_policy_event_thread = Mock() def tearDown(self): @@ -55,12 +56,13 @@ class SubscriptionHandlerTest(BaseClassSetup): mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value mock_get_aai.return_value = self.nfs sub_handler = SubscriptionHandler(self.mock_mr_pub, - self.app, self.app_conf, - self.mock_aai_event_thread) + self.mock_mr_sub, self.app, self.app_conf) sub_handler.execute() mock_logger.assert_called_with('Administrative State did not change ' 'in the app config: UNLOCKED') + @patch('mod.subscription_handler.SubscriptionHandler._start_aai_event_thread', + MagicMock()) @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config())) @patch('mod.subscription.Subscription.get_local_sub_admin_state') @patch('mod.subscription.Subscription.create_subscription_on_nfs') @@ -69,10 +71,8 @@ class SubscriptionHandlerTest(BaseClassSetup): mock_get_sub_status): mock_get_aai.return_value = self.nfs mock_get_sub_status.return_value = AdministrativeState.LOCKED.value - self.mock_aai_event_thread.return_value.start.return_value = 'start_method' - sub_handler = SubscriptionHandler(self.mock_mr_pub, - self.app, self.app_conf, - self.mock_aai_event_thread.return_value) + sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app, + self.app_conf) sub_handler.execute() self.assertEqual(AdministrativeState.UNLOCKED.value, self.app_conf.subscription.administrativeState) @@ -86,14 +86,12 @@ class SubscriptionHandlerTest(BaseClassSetup): mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value self.app_conf.subscription.update_subscription_status() - self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method' - sub_handler = SubscriptionHandler(self.mock_mr_pub, - self.app, self.app_conf, - self.mock_aai_event_thread.return_value) + sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app, + self.app_conf) sub_handler.execute() mock_deactivate_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf) - self.mock_aai_event_thread.return_value.cancel.assert_called() + @patch('mod.subscription_handler.SubscriptionHandler._start_aai_event_thread', MagicMock()) @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config())) @patch('mod.subscription.Subscription.create_subscription_on_nfs') @patch('mod.logger.error') @@ -101,9 +99,8 @@ class SubscriptionHandlerTest(BaseClassSetup): def test_execute_exception(self, mock_get_aai, mock_logger, mock_activate_sub): mock_get_aai.return_value = self.nfs mock_activate_sub.side_effect = Exception - sub_handler = SubscriptionHandler(self.mock_mr_pub, - self.app, self.app_conf, - self.mock_aai_event_thread) + sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app, + self.app_conf) sub_handler.execute() mock_logger.assert_called_with('Error occurred during the activation/deactivation process ', exc_info=True) @@ -125,8 +122,8 @@ class SubscriptionHandlerTest(BaseClassSetup): def test_execute_change_of_state_to_locking_retry_delete(self, mock_retry_inc, mock_delete_sub, mock_get_sub_status): mock_get_sub_status.return_value = AdministrativeState.LOCKING.value - sub_handler = SubscriptionHandler(self.mock_mr_pub, self.app, self.app_conf, - self.mock_aai_event_thread) + sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app, + self.app_conf) sub_handler.execute() self.assertEqual(mock_delete_sub.call_count, 2) self.assertEqual(mock_retry_inc.call_count, 2) @@ -139,8 +136,8 @@ class SubscriptionHandlerTest(BaseClassSetup): def test_execute_change_of_state_to_locking_success(self, mock_update_sub, mock_get_sub_status): mock_get_sub_status.return_value = AdministrativeState.LOCKING.value - sub_handler = SubscriptionHandler(self.mock_mr_pub, self.app, self.app_conf, - self.mock_aai_event_thread) + sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app, + self.app_conf) sub_handler.execute() mock_update_sub.assert_called_once() @@ -161,7 +158,7 @@ class SubscriptionHandlerTest(BaseClassSetup): def test_execute_change_of_state_to_locking_retry_failed(self, mock_nf_del, mock_get_sub_status): mock_get_sub_status.return_value = AdministrativeState.LOCKING.value - sub_handler = SubscriptionHandler(self.mock_mr_pub, self.app, self.app_conf, - self.mock_aai_event_thread) + sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app, + self.app_conf) sub_handler.execute() mock_nf_del.assert_called_once() -- cgit 1.2.3-korg