summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py')
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py83
1 files changed, 59 insertions, 24 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
index 6de702f2..a273a446 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
@@ -22,12 +22,11 @@ from mod.subscription import AdministrativeState
class SubscriptionHandler:
- def __init__(self, mr_pub, app, app_conf, aai_event_thread, policy_event_thread):
+ def __init__(self, mr_pub, app, app_conf, aai_event_thread):
self.mr_pub = mr_pub
self.app = app
self.app_conf = app_conf
self.aai_event_thread = aai_event_thread
- self.policy_event_thread = policy_event_thread
def execute(self):
"""
@@ -35,37 +34,73 @@ class SubscriptionHandler:
the Subscription if a change has occurred
"""
self.app.app_context().push()
+ self.app_conf.refresh_config()
local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
new_administrative_state = self.app_conf.subscription.administrativeState
try:
if local_admin_state == new_administrative_state:
- logger.info('Administrative State did not change in the Config')
+ logger.info(f'Administrative State did not change in the app config: '
+ f'{new_administrative_state}')
else:
- if new_administrative_state == AdministrativeState.UNLOCKED.value:
- self._activate(local_admin_state, new_administrative_state)
- elif local_admin_state == AdministrativeState.PENDING.value:
- logger.info('Administrative State is PENDING')
- else:
- self._deactivate(local_admin_state, new_administrative_state)
+ self._check_state_change(local_admin_state, new_administrative_state)
except Exception as err:
logger.error(f'Error occurred during the activation/deactivation process {err}',
exc_info=True)
- def _activate(self, local_admin_state, new_administrative_state):
- logger.info(f'Administrative State has changed from {local_admin_state} '
- f'to {new_administrative_state}.')
+ def _check_state_change(self, local_admin_state, new_administrative_state):
+ if local_admin_state == AdministrativeState.LOCKING.value:
+ self._check_for_failed_nfs()
+ else:
+ if new_administrative_state == AdministrativeState.UNLOCKED.value:
+ logger.info(f'Administrative State has changed from {local_admin_state} '
+ f'to {new_administrative_state}.')
+ self._activate()
+ elif new_administrative_state == AdministrativeState.LOCKED.value:
+ logger.info(f'Administrative State has changed from {local_admin_state} '
+ f'to {new_administrative_state}.')
+ self._deactivate()
+ else:
+ logger.error(f'Invalid AdministrativeState: {new_administrative_state}')
+
+ def _activate(self):
nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf)
- self.app_conf.subscription.activate_subscription(nfs_in_aai, self.mr_pub,
- self.app_conf)
+ self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub,
+ self.app_conf)
self.app_conf.subscription.update_subscription_status()
- logger.info('Start listening for new NFs on AAI-EVENT topic in MR.')
- self.aai_event_thread.start()
- self.policy_event_thread.start()
+ if not self.aai_event_thread.is_alive():
+ logger.info('Start polling for NF info on AAI-EVENT topic on DMaaP MR.')
+ self.aai_event_thread.start()
- def _deactivate(self, local_admin_state, new_administrative_state):
- logger.info(f'Administrative State has changed from {local_admin_state} '
- f'to {new_administrative_state}.')
- self.aai_event_thread.cancel()
- logger.info('Stop listening for NFs on AAI-EVENT topic in MR.')
- self.app_conf.subscription.deactivate_subscription(self.mr_pub, self.app_conf)
- self.app_conf.subscription.update_subscription_status()
+ def _deactivate(self):
+ nfs = self.app_conf.subscription.get_network_functions()
+ if nfs:
+ self.aai_event_thread.cancel()
+ logger.info('Stop listening for NFs events on AAI-EVENT topic in MR.')
+ self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value
+ logger.info('Subscription is now LOCKING/DEACTIVATING.')
+ self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub, self.app_conf)
+ self.app_conf.subscription.update_subscription_status()
+
+ def _check_for_failed_nfs(self):
+ logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.')
+ del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs()
+ if del_failed_nfs or self.app_conf.subscription.get_delete_pending_nfs():
+ for nf in del_failed_nfs:
+ nf_model = nf.get(nf.nf_name)
+ if nf_model.retry_count < 3:
+ logger.info(f'Retry deletion of subscription '
+ f'{self.app_conf.subscription.subscriptionName} '
+ f'from NF: {nf.nf_name}')
+ self.app_conf.subscription.delete_subscription_from_nfs([nf], self.mr_pub,
+ self.app_conf)
+ nf.increment_retry_count()
+ else:
+ logger.error(f'Failed to delete the subscription '
+ f'{self.app_conf.subscription.subscriptionName} '
+ f'from NF: {nf.nf_name} after {nf_model.retry_count} '
+ f'attempts. Removing NF from DB')
+ nf.delete(nf_name=nf.nf_name)
+ else:
+ logger.info('Proceeding to LOCKED adminState.')
+ self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
+ self.app_conf.subscription.update_subscription_status()