diff options
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py')
-rw-r--r-- | components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py | 80 |
1 files changed, 49 insertions, 31 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index a273a446..f50f5ab2 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2020 Nordix Foundation. +# Copyright (C) 2020-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,17 +16,19 @@ # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== -import mod.aai_client as aai -from mod import logger +from mod import logger, aai_client +from mod.aai_event_handler import process_aai_events +from mod.pmsh_utils import PeriodicTask from mod.subscription import AdministrativeState class SubscriptionHandler: - def __init__(self, mr_pub, app, app_conf, aai_event_thread): + def __init__(self, mr_pub, aai_sub, app, app_conf): self.mr_pub = mr_pub + self.aai_sub = aai_sub self.app = app self.app_conf = app_conf - self.aai_event_thread = aai_event_thread + self.aai_event_thread = None def execute(self): """ @@ -34,53 +36,69 @@ class SubscriptionHandler: the Subscription if a change has occurred """ self.app.app_context().push() - self.app_conf.refresh_config() - local_admin_state = self.app_conf.subscription.get_local_sub_admin_state() - new_administrative_state = self.app_conf.subscription.administrativeState try: - if local_admin_state == new_administrative_state: - logger.info(f'Administrative State did not change in the app config: ' - f'{new_administrative_state}') + local_admin_state = self.app_conf.subscription.get_local_sub_admin_state() + if local_admin_state == AdministrativeState.LOCKING.value: + self._check_for_failed_nfs() else: - self._check_state_change(local_admin_state, new_administrative_state) + self.app_conf.refresh_config() + new_administrative_state = self.app_conf.subscription.administrativeState + if local_admin_state == new_administrative_state: + logger.info(f'Administrative State did not change in the app config: ' + f'{new_administrative_state}') + else: + self._check_state_change(local_admin_state, new_administrative_state) except Exception as err: logger.error(f'Error occurred during the activation/deactivation process {err}', exc_info=True) def _check_state_change(self, local_admin_state, new_administrative_state): - if local_admin_state == AdministrativeState.LOCKING.value: - self._check_for_failed_nfs() + if new_administrative_state == AdministrativeState.UNLOCKED.value: + logger.info(f'Administrative State has changed from {local_admin_state} ' + f'to {new_administrative_state}.') + self._activate(new_administrative_state) + elif new_administrative_state == AdministrativeState.LOCKED.value: + logger.info(f'Administrative State has changed from {local_admin_state} ' + f'to {new_administrative_state}.') + self._deactivate() else: - if new_administrative_state == AdministrativeState.UNLOCKED.value: - logger.info(f'Administrative State has changed from {local_admin_state} ' - f'to {new_administrative_state}.') - self._activate() - elif new_administrative_state == AdministrativeState.LOCKED.value: - logger.info(f'Administrative State has changed from {local_admin_state} ' - f'to {new_administrative_state}.') - self._deactivate() - else: - logger.error(f'Invalid AdministrativeState: {new_administrative_state}') + raise Exception(f'Invalid AdministrativeState: {new_administrative_state}') - def _activate(self): - nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf) + def _activate(self, new_administrative_state): + self._start_aai_event_thread() + self.app_conf.subscription.update_sub_params(new_administrative_state, + self.app_conf.subscription.fileBasedGP, + self.app_conf.subscription.fileLocation, + self.app_conf.subscription.measurementGroups) + nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf) self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub, self.app_conf) self.app_conf.subscription.update_subscription_status() - if not self.aai_event_thread.is_alive(): - logger.info('Start polling for NF info on AAI-EVENT topic on DMaaP MR.') - self.aai_event_thread.start() def _deactivate(self): nfs = self.app_conf.subscription.get_network_functions() if nfs: - self.aai_event_thread.cancel() - logger.info('Stop listening for NFs events on AAI-EVENT topic in MR.') + self.stop_aai_event_thread() self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value logger.info('Subscription is now LOCKING/DEACTIVATING.') self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub, self.app_conf) self.app_conf.subscription.update_subscription_status() + def _start_aai_event_thread(self): + logger.info('Starting polling for NF info on AAI-EVENT topic on DMaaP MR.') + self.aai_event_thread = PeriodicTask(20, process_aai_events, args=(self.aai_sub, + self.mr_pub, + self.app, + self.app_conf)) + self.aai_event_thread.name = 'aai_event_thread' + self.aai_event_thread.start() + + def stop_aai_event_thread(self): + if self.aai_event_thread is not None: + self.aai_event_thread.cancel() + self.aai_event_thread = None + logger.info('Stopping polling for NFs events on AAI-EVENT topic in MR.') + def _check_for_failed_nfs(self): logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.') del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs() |