summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2021-01-13 16:12:57 +0000
committerefiacor <fiachra.corcoran@est.tech>2021-01-26 16:47:26 +0000
commitf7be006e7cc638788164fb1028d03898138b8c16 (patch)
tree253d460ca148495d31203b9038b1e24ec209d7e9 /components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
parent2510fa240ca7395ff8a36762d5892413ab05fd68 (diff)
[PMSH] Update sub object on activate
Signed-off-by: efiacor <fiachra.corcoran@est.tech> Change-Id: Id9418301e0cb4d373339b9b3e3476f7db5770f3e Issue-ID: DCAEGEN2-2152
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py')
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py80
1 files changed, 49 insertions, 31 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
index a273a446..f50f5ab2 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2020 Nordix Foundation.
+# Copyright (C) 2020-2021 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,17 +16,19 @@
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-import mod.aai_client as aai
-from mod import logger
+from mod import logger, aai_client
+from mod.aai_event_handler import process_aai_events
+from mod.pmsh_utils import PeriodicTask
from mod.subscription import AdministrativeState
class SubscriptionHandler:
- def __init__(self, mr_pub, app, app_conf, aai_event_thread):
+ def __init__(self, mr_pub, aai_sub, app, app_conf):
self.mr_pub = mr_pub
+ self.aai_sub = aai_sub
self.app = app
self.app_conf = app_conf
- self.aai_event_thread = aai_event_thread
+ self.aai_event_thread = None
def execute(self):
"""
@@ -34,53 +36,69 @@ class SubscriptionHandler:
the Subscription if a change has occurred
"""
self.app.app_context().push()
- self.app_conf.refresh_config()
- local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
- new_administrative_state = self.app_conf.subscription.administrativeState
try:
- if local_admin_state == new_administrative_state:
- logger.info(f'Administrative State did not change in the app config: '
- f'{new_administrative_state}')
+ local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
+ if local_admin_state == AdministrativeState.LOCKING.value:
+ self._check_for_failed_nfs()
else:
- self._check_state_change(local_admin_state, new_administrative_state)
+ self.app_conf.refresh_config()
+ new_administrative_state = self.app_conf.subscription.administrativeState
+ if local_admin_state == new_administrative_state:
+ logger.info(f'Administrative State did not change in the app config: '
+ f'{new_administrative_state}')
+ else:
+ self._check_state_change(local_admin_state, new_administrative_state)
except Exception as err:
logger.error(f'Error occurred during the activation/deactivation process {err}',
exc_info=True)
def _check_state_change(self, local_admin_state, new_administrative_state):
- if local_admin_state == AdministrativeState.LOCKING.value:
- self._check_for_failed_nfs()
+ if new_administrative_state == AdministrativeState.UNLOCKED.value:
+ logger.info(f'Administrative State has changed from {local_admin_state} '
+ f'to {new_administrative_state}.')
+ self._activate(new_administrative_state)
+ elif new_administrative_state == AdministrativeState.LOCKED.value:
+ logger.info(f'Administrative State has changed from {local_admin_state} '
+ f'to {new_administrative_state}.')
+ self._deactivate()
else:
- if new_administrative_state == AdministrativeState.UNLOCKED.value:
- logger.info(f'Administrative State has changed from {local_admin_state} '
- f'to {new_administrative_state}.')
- self._activate()
- elif new_administrative_state == AdministrativeState.LOCKED.value:
- logger.info(f'Administrative State has changed from {local_admin_state} '
- f'to {new_administrative_state}.')
- self._deactivate()
- else:
- logger.error(f'Invalid AdministrativeState: {new_administrative_state}')
+ raise Exception(f'Invalid AdministrativeState: {new_administrative_state}')
- def _activate(self):
- nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf)
+ def _activate(self, new_administrative_state):
+ self._start_aai_event_thread()
+ self.app_conf.subscription.update_sub_params(new_administrative_state,
+ self.app_conf.subscription.fileBasedGP,
+ self.app_conf.subscription.fileLocation,
+ self.app_conf.subscription.measurementGroups)
+ nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub,
self.app_conf)
self.app_conf.subscription.update_subscription_status()
- if not self.aai_event_thread.is_alive():
- logger.info('Start polling for NF info on AAI-EVENT topic on DMaaP MR.')
- self.aai_event_thread.start()
def _deactivate(self):
nfs = self.app_conf.subscription.get_network_functions()
if nfs:
- self.aai_event_thread.cancel()
- logger.info('Stop listening for NFs events on AAI-EVENT topic in MR.')
+ self.stop_aai_event_thread()
self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value
logger.info('Subscription is now LOCKING/DEACTIVATING.')
self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub, self.app_conf)
self.app_conf.subscription.update_subscription_status()
+ def _start_aai_event_thread(self):
+ logger.info('Starting polling for NF info on AAI-EVENT topic on DMaaP MR.')
+ self.aai_event_thread = PeriodicTask(20, process_aai_events, args=(self.aai_sub,
+ self.mr_pub,
+ self.app,
+ self.app_conf))
+ self.aai_event_thread.name = 'aai_event_thread'
+ self.aai_event_thread.start()
+
+ def stop_aai_event_thread(self):
+ if self.aai_event_thread is not None:
+ self.aai_event_thread.cancel()
+ self.aai_event_thread = None
+ logger.info('Stopping polling for NFs events on AAI-EVENT topic in MR.')
+
def _check_for_failed_nfs(self):
logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.')
del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs()