From 38ccb471732faaad6a25defee0753c1c5ac60cf0 Mon Sep 17 00:00:00 2001 From: efiacor Date: Wed, 5 Aug 2020 10:12:04 +0100 Subject: Refactor and bug fixes Signed-off-by: efiacor Change-Id: I8fe91bfdd2f1a2c8a6ca914e52d82dce04bffc0e Issue-ID: DCAEGEN2-2146 --- .../pmsh_service/mod/__init__.py | 2 + .../pmsh_service/mod/aai_client.py | 6 +- .../pmsh_service/mod/aai_event_handler.py | 3 +- .../pmsh_service/mod/exit_handler.py | 14 +- .../pmsh_service/mod/network_function.py | 9 +- .../pmsh_service/mod/pmsh_utils.py | 32 ++-- .../pmsh_service/mod/subscription.py | 205 +++++++++++---------- .../pmsh_service/mod/subscription_handler.py | 44 +++-- .../pmsh_service/pmsh_service_main.py | 21 +-- 9 files changed, 179 insertions(+), 157 deletions(-) (limited to 'components/pm-subscription-handler/pmsh_service') diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py index 4c86ccda..58cd8b3c 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py +++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py @@ -43,9 +43,11 @@ def launch_api_server(app_config): connex_app = _get_app() connex_app.add_api('api/pmsh_swagger.yml') if app_config.enable_tls: + logger.info('Launching secure http API server') connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), ssl_context=app_config.cert_params) else: + logger.info('Launching unsecure http API server') connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443')) diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index bd052741..5a3c543a 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -114,11 +114,11 @@ def _filter_nf_data(nf_data, nf_filter): Returns a list of filtered NetworkFunctions using the nf_filter. Args: - nf_data : the nf json data from AAI. - nf_filter: the `NetworkFunctionFilter ` to be applied. + nf_data(dict): the nf json data from AAI. + nf_filter(NetworkFunctionFilter): the NetworkFunctionFilter to be applied. Returns: - set: a set of filtered NetworkFunctions. + set(NetworkFunctions): a set of filtered NetworkFunctions. Raises: KeyError: if AAI data cannot be parsed. diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index f1e8cf27..60b69602 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -72,8 +72,7 @@ def _process_event(action, new_status, xnf_name, mr_pub, app_conf): local_xnf = NetworkFunction.get(xnf_name) if local_xnf is None: - logger.info(f'Activating subscription for network function {xnf_name}') - app_conf.subscription.process_subscription([NetworkFunction( + app_conf.subscription.activate_subscription([NetworkFunction( nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf) else: logger.debug(f"Update Event for network function {xnf_name} will not be processed " diff --git a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py index 3d02375d..12932966 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py @@ -25,28 +25,28 @@ class ExitHandler: Args: periodic_tasks (List[PeriodicTask]): PeriodicTasks that needs to be cancelled. + app_conf (AppConfig): The PMSH Application Configuration. subscription_handler (SubscriptionHandler): The subscription handler instance. """ shutdown_signal_received = False - def __init__(self, *, periodic_tasks, subscription_handler): + def __init__(self, *, periodic_tasks, app_conf, subscription_handler): self.periodic_tasks = periodic_tasks + self.app_conf = app_conf self.subscription_handler = subscription_handler def __call__(self, sig_num, frame): logger.info('Graceful shutdown of PMSH initiated.') logger.debug(f'ExitHandler was called with signal number: {sig_num}.') - current_sub = self.subscription_handler.current_sub - if current_sub and current_sub.administrativeState == AdministrativeState.UNLOCKED.value: + current_sub = self.app_conf.subscription + if current_sub.administrativeState == AdministrativeState.UNLOCKED.value: try: + current_sub.deactivate_subscription(self.subscription_handler.mr_pub, self.app_conf) + current_sub.update_subscription_status() for thread in self.periodic_tasks: logger.debug(f'Cancelling periodic task with thread name: {thread.name}.') thread.cancel() - current_sub.administrativeState = AdministrativeState.LOCKED.value - current_sub.process_subscription(current_sub.get_network_functions(), - self.subscription_handler.mr_pub, - self.subscription_handler.app_conf) except Exception as e: logger.error(f'Failed to shut down PMSH application: {e}', exc_info=True) ExitHandler.shutdown_signal_received = True diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py index 979cc775..191e9512 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py +++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py @@ -17,7 +17,6 @@ # ============LICENSE_END===================================================== import re -from enum import Enum from mod import logger, db from mod.api.db_models import NetworkFunctionModel @@ -55,7 +54,6 @@ class NetworkFunction: db.session.commit() logger.info(f'Network Function {new_nf.nf_name} successfully created.') return new_nf - else: logger.debug(f'Network function {existing_nf.nf_name} already exists,' f' returning this network function..') @@ -109,9 +107,4 @@ class NetworkFunctionFilter: bool: True if matched, else False. """ return self.regex_matcher.search(nf_name) and \ - orchestration_status == OrchestrationStatus.ACTIVE.value - - -class OrchestrationStatus(Enum): - ACTIVE = 'Active' - INVENTORIED = 'Inventoried' + orchestration_status == 'Active' diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 50eb122b..872843d7 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -15,7 +15,6 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== -import threading import uuid from os import getenv from threading import Timer @@ -45,20 +44,22 @@ def mdc_handler(function): return decorator -class ThreadSafeSingleton(type): - _instances = {} - _singleton_lock = threading.Lock() +class MySingleton(object): + instances = {} - def __call__(cls, *args, **kwargs): - # double-checked locking pattern (https://en.wikipedia.org/wiki/Double-checked_locking) - if cls not in cls._instances: - with cls._singleton_lock: - if cls not in cls._instances: - cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs) - return cls._instances[cls] + def __new__(cls, clz=None): + if clz is None: + if cls.__name__ not in MySingleton.instances: + MySingleton.instances[cls.__name__] = \ + object.__new__(cls) + return MySingleton.instances[cls.__name__] + MySingleton.instances[clz.__name__] = clz() + MySingleton.first = clz + return type(clz.__name__, (MySingleton,), dict(clz.__dict__)) -class AppConfig(metaclass=ThreadSafeSingleton): +class AppConfig: + INSTANCE = None def __init__(self): try: @@ -78,6 +79,11 @@ class AppConfig(metaclass=ThreadSafeSingleton): self.subscription = Subscription(**conf['policy']['subscription']) self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter) + def __new__(cls, *args, **kwargs): + if AppConfig.INSTANCE is None: + AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs) + return AppConfig.INSTANCE + @mdc_handler @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) def _get_pmsh_config(self, **kwargs): @@ -272,7 +278,7 @@ class _MrSub(_DmaapMrClient): if response.ok: return response.json() except Exception as e: - logger.error(f'Failed to fetch message from MR: {e}') + logger.error(f'Failed to fetch message from MR: {e}', exc_info=True) raise diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index dbcd7a5e..97bfc401 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -33,6 +33,7 @@ class SubNfState(Enum): class AdministrativeState(Enum): UNLOCKED = 'UNLOCKED' LOCKED = 'LOCKED' + PENDING = 'PENDING' subscription_nf_states = { @@ -55,28 +56,7 @@ class Subscription: self.fileLocation = kwargs.get('fileLocation') self.nfFilter = kwargs.get('nfFilter') self.measurementGroups = kwargs.get('measurementGroups') - - def prepare_subscription_event(self, xnf_name, app_conf): - """Prepare the sub event for publishing - - Args: - xnf_name: the AAI xnf name. - app_conf (AppConfig): the application configuration. - - Returns: - dict: the Subscription event to be published. - """ - try: - clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} - sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name, - 'changeType': 'DELETE' - if self.administrativeState == AdministrativeState.LOCKED.value - else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name, - 'subscription': clean_sub} - return sub_event - except Exception as e: - logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True) - raise + self.create() def create(self): """ Creates a subscription database entry @@ -89,7 +69,7 @@ class Subscription: SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()) if existing_subscription is None: new_subscription = SubscriptionModel(subscription_name=self.subscriptionName, - status=self.administrativeState) + status=AdministrativeState.PENDING.value) db.session.add(new_subscription) db.session.commit() return new_subscription @@ -101,54 +81,111 @@ class Subscription: logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}', exc_info=True) - def add_network_function_to_subscription(self, nf): + def update_subscription_status(self): + """ Updates the status of subscription in subscription table """ + try: + SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName)\ + .update({SubscriptionModel.status: self.administrativeState}, + synchronize_session='evaluate') + + db.session.commit() + except Exception as e: + logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}', + exc_info=True) + + def delete_subscription(self): + """ Deletes a subscription and all its association from the database. A network function + that is only associated with the subscription being removed will also be deleted.""" + try: + subscription = SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() + if subscription: + for nf_relationship in subscription.nfs: + other_nf_relationship = NfSubRelationalModel.query.filter( + NfSubRelationalModel.subscription_name != self.subscriptionName, + NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none() + if not other_nf_relationship: + db.session.delete(nf_relationship.nf) + db.session.delete(subscription) + db.session.commit() + except Exception as e: + logger.error(f'Failed to delete subscription: {self.subscriptionName} ' + f'and it\'s relations from the DB: {e}', exc_info=True) + + def prepare_subscription_event(self, xnf_name, app_conf): + """Prepare the sub event for publishing + + Args: + xnf_name: the AAI xnf name. + app_conf (AppConfig): the application configuration. + + Returns: + dict: the Subscription event to be published. + """ + try: + clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} + sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name, + 'changeType': 'DELETE' + if self.administrativeState == AdministrativeState.LOCKED.value + else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name, + 'subscription': clean_sub} + return sub_event + except Exception as e: + logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True) + raise + + def add_network_function_to_subscription(self, nf, sub_model): """ Associates a network function to a Subscription Args: - nf : A NetworkFunction object. + sub_model(SubscriptionModel): The SubscriptionModel from the DB. + nf(NetworkFunction): A NetworkFunction object. """ - current_sub = self.create() try: current_nf = nf.create() - logger.debug(f'Adding network function {nf.nf_name} to Subscription ' - f'{current_sub.subscription_name}') existing_entry = NfSubRelationalModel.query.filter( - NfSubRelationalModel.subscription_name == current_sub.subscription_name, + NfSubRelationalModel.subscription_name == self.subscriptionName, NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none() if existing_entry is None: - new_nf_sub = NfSubRelationalModel(current_sub.subscription_name, + new_nf_sub = NfSubRelationalModel(self.subscriptionName, nf.nf_name, SubNfState.PENDING_CREATE.value) - new_nf_sub.nf = current_nf - current_sub.nfs.append(new_nf_sub) - logger.debug(f'Network function {current_nf.nf_name} added to Subscription ' - f'{current_sub.subscription_name}') - db.session.add(current_sub) + sub_model.nfs.append(new_nf_sub) + db.session.add(sub_model) db.session.commit() + logger.info(f'Network function {current_nf.nf_name} added to Subscription ' + f'{self.subscriptionName}') except Exception as e: logger.error(f'Failed to add nf {nf.nf_name} to subscription ' - f'{current_sub.subscription_name}: {e}', exc_info=True) - logger.debug(f'Subscription {current_sub.subscription_name} now contains these XNFs:' - f'{Subscription.get_nf_names_per_sub(current_sub.subscription_name)}') + f'{self.subscriptionName}: {e}', exc_info=True) + logger.debug(f'Subscription {self.subscriptionName} now contains these XNFs:' + f'{Subscription.get_nf_names_per_sub(self.subscriptionName)}') - @staticmethod - def get(subscription_name): - """ Retrieves a subscription - - Args: - subscription_name (str): The subscription name + def get(self): + """ Retrieves a SubscriptionModel object Returns: - Subscription object else None + SubscriptionModel object else None """ return SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == subscription_name).one_or_none() + SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() + + def get_local_sub_admin_state(self): + """ Retrieves the subscription admin state + + Returns: + str: The admin state of the SubscriptionModel + """ + sub_model = SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() + return sub_model.status @staticmethod def get_all(): """ Retrieves a list of subscriptions Returns: - list: Subscription list else empty + list(SubscriptionModel): Subscriptions list else empty """ return SubscriptionModel.query.all() @@ -160,7 +197,7 @@ class Subscription: subscription_name (str): The subscription name Returns: - list: List of network function names + list(str): List of network function names """ nf_sub_rel = NfSubRelationalModel.query.filter( NfSubRelationalModel.subscription_name == subscription_name).all() @@ -170,65 +207,41 @@ class Subscription: return list_of_nfs - def update_subscription_status(self): - """ Updates the status of subscription in subscription table """ - try: - SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName)\ - .update({SubscriptionModel.status: self.administrativeState}, - synchronize_session='evaluate') - - db.session.commit() - except Exception as e: - logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}', - exc_info=True) - - def delete_subscription(self): - """ Deletes a subscription and all its association from the database. A network function - that is only associated with the subscription being removed will also be deleted.""" - try: - subscription = SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() - if subscription: - for nf_relationship in subscription.nfs: - other_nf_relationship = NfSubRelationalModel.query.filter( - NfSubRelationalModel.subscription_name != self.subscriptionName, - NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none() - if not other_nf_relationship: - db.session.delete(nf_relationship.nf) - db.session.delete(subscription) - db.session.commit() - except Exception as e: - logger.error(f'Failed to delete subscription: {self.subscriptionName} ' - f'and it\'s relations from the DB: {e}', exc_info=True) - - def process_subscription(self, nfs, mr_pub, app_conf): - action = 'Deactivate' - sub_nf_state = SubNfState.PENDING_DELETE.value - self.update_subscription_status() - - if self.administrativeState == AdministrativeState.UNLOCKED.value: - action = 'Activate' - sub_nf_state = SubNfState.PENDING_CREATE.value - logger.info(f'{action} subscription initiated for {self.subscriptionName}.') - + def activate_subscription(self, nfs, mr_pub, app_conf): + logger.info(f'Activate subscription initiated for {self.subscriptionName}.') try: + sub_model = self.get() for nf in nfs: mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf) - logger.debug(f'Publishing Event to {action} ' - f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') - if action == 'Activate': - self.add_network_function_to_subscription(nf) - self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name) + logger.info(f'Publishing event to activate ' + f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') + self.add_network_function_to_subscription(nf, sub_model) + self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value, + nf.nf_name) except Exception as err: raise Exception(f'Error publishing activation event to MR: {err}') + def deactivate_subscription(self, mr_pub, app_conf): + nfs = self.get_network_functions() + try: + if nfs: + logger.info(f'Deactivate subscription initiated for {self.subscriptionName}.') + for nf in nfs: + mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf) + logger.debug(f'Publishing Event to deactivate ' + f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') + self.update_sub_nf_status(self.subscriptionName, + SubNfState.PENDING_DELETE.value, + nf.nf_name) + except Exception as err: + raise Exception(f'Error publishing deactivation event to MR: {err}') + @staticmethod def get_all_nfs_subscription_relations(): """ Retrieves all network function to subscription relations Returns: - list: NetworkFunctions per Subscription list else empty + list(NfSubRelationalModel): NetworkFunctions per Subscription list else empty """ nf_per_subscriptions = NfSubRelationalModel.query.all() return nf_per_subscriptions diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index 74b6ac88..e74a1732 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -22,14 +22,12 @@ from mod.subscription import AdministrativeState class SubscriptionHandler: - def __init__(self, administrative_state, mr_pub, app, app_conf, aai_event_thread): - self.current_nfs = None - self.current_sub = None - self.administrative_state = administrative_state + def __init__(self, mr_pub, app, app_conf, aai_event_thread, policy_event_thread): self.mr_pub = mr_pub self.app = app self.app_conf = app_conf self.aai_event_thread = aai_event_thread + self.policy_event_thread = policy_event_thread def execute(self): """ @@ -37,25 +35,37 @@ class SubscriptionHandler: the Subscription if a change has occurred """ self.app.app_context().push() + local_admin_state = self.app_conf.subscription.get_local_sub_admin_state() new_administrative_state = self.app_conf.subscription.administrativeState try: - if self.administrative_state == new_administrative_state: + if local_admin_state == new_administrative_state: logger.info('Administrative State did not change in the Config') else: - self.current_nfs = aai.get_pmsh_nfs_from_aai(self.app_conf) - self.current_sub = self.app_conf.subscription - logger.info(f'Administrative State has changed from {self.administrative_state} ' - f'to {new_administrative_state}.') - self.administrative_state = new_administrative_state - self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf) - if new_administrative_state == AdministrativeState.UNLOCKED.value: - logger.info('Listening to AAI-EVENT topic in MR.') - self.aai_event_thread.start() + self._activate(local_admin_state, new_administrative_state) + elif local_admin_state == AdministrativeState.PENDING.value: + logger.info('Administrative State is PENDING') else: - logger.info('Stop listening to AAI-EVENT topic in MR.') - self.aai_event_thread.cancel() - + self._deactivate(local_admin_state, new_administrative_state) except Exception as err: logger.error(f'Error occurred during the activation/deactivation process {err}', exc_info=True) + + def _activate(self, local_admin_state, new_administrative_state): + logger.info(f'Administrative State has changed from {local_admin_state} ' + f'to {new_administrative_state}.') + existing_nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf) + self.app_conf.subscription.activate_subscription(existing_nfs_in_aai, self.mr_pub, + self.app_conf) + self.app_conf.subscription.update_subscription_status() + logger.info('Start listening for new NFs on AAI-EVENT topic in MR.') + self.aai_event_thread.start() + self.policy_event_thread.start() + + def _deactivate(self, local_admin_state, new_administrative_state): + logger.info(f'Administrative State has changed from {local_admin_state} ' + f'to {new_administrative_state}.') + self.aai_event_thread.cancel() + logger.info('Stop listening for NFs on AAI-EVENT topic in MR.') + self.app_conf.subscription.deactivate_subscription(self.mr_pub, self.app_conf) + self.app_conf.subscription.update_subscription_status() diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index b3c906d0..6b6b9baa 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -23,7 +23,6 @@ from mod.aai_event_handler import process_aai_events from mod.exit_handler import ExitHandler from mod.pmsh_utils import AppConfig, PeriodicTask from mod.policy_response_handler import PolicyResponseHandler -from mod.subscription import Subscription, AdministrativeState from mod.subscription_handler import SubscriptionHandler @@ -40,27 +39,27 @@ def main(): except Exception as e: logger.error(f'Failed to get config and create application: {e}', exc_info=True) sys.exit(e) - subscription_in_db = Subscription.get(app_conf.subscription.subscriptionName) - administrative_state = subscription_in_db.status if subscription_in_db \ - else AdministrativeState.LOCKED.value app_conf_thread = PeriodicTask(10, app_conf.refresh_config) app_conf_thread.start() - aai_event_thread = PeriodicTask(10, process_aai_events, - args=(aai_event_mr_sub, policy_mr_pub, app, app_conf)) - subscription_handler = SubscriptionHandler(administrative_state, - policy_mr_pub, app, app_conf, aai_event_thread) + policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app) + policy_response_handler_thread = PeriodicTask(25, policy_response_handler.poll_policy_topic) + + aai_event_thread = PeriodicTask(20, process_aai_events, + args=(aai_event_mr_sub, policy_mr_pub, app, app_conf)) + + subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread, + policy_response_handler_thread) subscription_handler_thread = PeriodicTask(30, subscription_handler.execute) - policy_response_handler_thread = PeriodicTask(10, policy_response_handler.poll_policy_topic) subscription_handler_thread.start() - policy_response_handler_thread.start() + periodic_tasks = [app_conf_thread, aai_event_thread, subscription_handler_thread, policy_response_handler_thread] signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks, - subscription_handler=subscription_handler)) + app_conf=app_conf, subscription_handler=subscription_handler)) launch_api_server(app_conf) except Exception as e: -- cgit 1.2.3-korg