summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/__init__.py2
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_client.py6
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py3
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/exit_handler.py14
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/network_function.py9
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py32
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/subscription.py205
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py44
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/pmsh_service_main.py21
9 files changed, 179 insertions, 157 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
index 4c86ccda..58cd8b3c 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
@@ -43,9 +43,11 @@ def launch_api_server(app_config):
connex_app = _get_app()
connex_app.add_api('api/pmsh_swagger.yml')
if app_config.enable_tls:
+ logger.info('Launching secure http API server')
connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'),
ssl_context=app_config.cert_params)
else:
+ logger.info('Launching unsecure http API server')
connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'))
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
index bd052741..5a3c543a 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
@@ -114,11 +114,11 @@ def _filter_nf_data(nf_data, nf_filter):
Returns a list of filtered NetworkFunctions using the nf_filter.
Args:
- nf_data : the nf json data from AAI.
- nf_filter: the `NetworkFunctionFilter <NetworkFunctionFilter>` to be applied.
+ nf_data(dict): the nf json data from AAI.
+ nf_filter(NetworkFunctionFilter): the NetworkFunctionFilter to be applied.
Returns:
- set: a set of filtered NetworkFunctions.
+ set(NetworkFunctions): a set of filtered NetworkFunctions.
Raises:
KeyError: if AAI data cannot be parsed.
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
index f1e8cf27..60b69602 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
@@ -72,8 +72,7 @@ def _process_event(action, new_status, xnf_name, mr_pub, app_conf):
local_xnf = NetworkFunction.get(xnf_name)
if local_xnf is None:
- logger.info(f'Activating subscription for network function {xnf_name}')
- app_conf.subscription.process_subscription([NetworkFunction(
+ app_conf.subscription.activate_subscription([NetworkFunction(
nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf)
else:
logger.debug(f"Update Event for network function {xnf_name} will not be processed "
diff --git a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
index 3d02375d..12932966 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
@@ -25,28 +25,28 @@ class ExitHandler:
Args:
periodic_tasks (List[PeriodicTask]): PeriodicTasks that needs to be cancelled.
+ app_conf (AppConfig): The PMSH Application Configuration.
subscription_handler (SubscriptionHandler): The subscription handler instance.
"""
shutdown_signal_received = False
- def __init__(self, *, periodic_tasks, subscription_handler):
+ def __init__(self, *, periodic_tasks, app_conf, subscription_handler):
self.periodic_tasks = periodic_tasks
+ self.app_conf = app_conf
self.subscription_handler = subscription_handler
def __call__(self, sig_num, frame):
logger.info('Graceful shutdown of PMSH initiated.')
logger.debug(f'ExitHandler was called with signal number: {sig_num}.')
- current_sub = self.subscription_handler.current_sub
- if current_sub and current_sub.administrativeState == AdministrativeState.UNLOCKED.value:
+ current_sub = self.app_conf.subscription
+ if current_sub.administrativeState == AdministrativeState.UNLOCKED.value:
try:
+ current_sub.deactivate_subscription(self.subscription_handler.mr_pub, self.app_conf)
+ current_sub.update_subscription_status()
for thread in self.periodic_tasks:
logger.debug(f'Cancelling periodic task with thread name: {thread.name}.')
thread.cancel()
- current_sub.administrativeState = AdministrativeState.LOCKED.value
- current_sub.process_subscription(current_sub.get_network_functions(),
- self.subscription_handler.mr_pub,
- self.subscription_handler.app_conf)
except Exception as e:
logger.error(f'Failed to shut down PMSH application: {e}', exc_info=True)
ExitHandler.shutdown_signal_received = True
diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
index 979cc775..191e9512 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
@@ -17,7 +17,6 @@
# ============LICENSE_END=====================================================
import re
-from enum import Enum
from mod import logger, db
from mod.api.db_models import NetworkFunctionModel
@@ -55,7 +54,6 @@ class NetworkFunction:
db.session.commit()
logger.info(f'Network Function {new_nf.nf_name} successfully created.')
return new_nf
-
else:
logger.debug(f'Network function {existing_nf.nf_name} already exists,'
f' returning this network function..')
@@ -109,9 +107,4 @@ class NetworkFunctionFilter:
bool: True if matched, else False.
"""
return self.regex_matcher.search(nf_name) and \
- orchestration_status == OrchestrationStatus.ACTIVE.value
-
-
-class OrchestrationStatus(Enum):
- ACTIVE = 'Active'
- INVENTORIED = 'Inventoried'
+ orchestration_status == 'Active'
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index 50eb122b..872843d7 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -15,7 +15,6 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-import threading
import uuid
from os import getenv
from threading import Timer
@@ -45,20 +44,22 @@ def mdc_handler(function):
return decorator
-class ThreadSafeSingleton(type):
- _instances = {}
- _singleton_lock = threading.Lock()
+class MySingleton(object):
+ instances = {}
- def __call__(cls, *args, **kwargs):
- # double-checked locking pattern (https://en.wikipedia.org/wiki/Double-checked_locking)
- if cls not in cls._instances:
- with cls._singleton_lock:
- if cls not in cls._instances:
- cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
- return cls._instances[cls]
+ def __new__(cls, clz=None):
+ if clz is None:
+ if cls.__name__ not in MySingleton.instances:
+ MySingleton.instances[cls.__name__] = \
+ object.__new__(cls)
+ return MySingleton.instances[cls.__name__]
+ MySingleton.instances[clz.__name__] = clz()
+ MySingleton.first = clz
+ return type(clz.__name__, (MySingleton,), dict(clz.__dict__))
-class AppConfig(metaclass=ThreadSafeSingleton):
+class AppConfig:
+ INSTANCE = None
def __init__(self):
try:
@@ -78,6 +79,11 @@ class AppConfig(metaclass=ThreadSafeSingleton):
self.subscription = Subscription(**conf['policy']['subscription'])
self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter)
+ def __new__(cls, *args, **kwargs):
+ if AppConfig.INSTANCE is None:
+ AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs)
+ return AppConfig.INSTANCE
+
@mdc_handler
@retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
def _get_pmsh_config(self, **kwargs):
@@ -272,7 +278,7 @@ class _MrSub(_DmaapMrClient):
if response.ok:
return response.json()
except Exception as e:
- logger.error(f'Failed to fetch message from MR: {e}')
+ logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
raise
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
index dbcd7a5e..97bfc401 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
@@ -33,6 +33,7 @@ class SubNfState(Enum):
class AdministrativeState(Enum):
UNLOCKED = 'UNLOCKED'
LOCKED = 'LOCKED'
+ PENDING = 'PENDING'
subscription_nf_states = {
@@ -55,28 +56,7 @@ class Subscription:
self.fileLocation = kwargs.get('fileLocation')
self.nfFilter = kwargs.get('nfFilter')
self.measurementGroups = kwargs.get('measurementGroups')
-
- def prepare_subscription_event(self, xnf_name, app_conf):
- """Prepare the sub event for publishing
-
- Args:
- xnf_name: the AAI xnf name.
- app_conf (AppConfig): the application configuration.
-
- Returns:
- dict: the Subscription event to be published.
- """
- try:
- clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
- sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
- 'changeType': 'DELETE'
- if self.administrativeState == AdministrativeState.LOCKED.value
- else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
- 'subscription': clean_sub}
- return sub_event
- except Exception as e:
- logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True)
- raise
+ self.create()
def create(self):
""" Creates a subscription database entry
@@ -89,7 +69,7 @@ class Subscription:
SubscriptionModel.subscription_name == self.subscriptionName).one_or_none())
if existing_subscription is None:
new_subscription = SubscriptionModel(subscription_name=self.subscriptionName,
- status=self.administrativeState)
+ status=AdministrativeState.PENDING.value)
db.session.add(new_subscription)
db.session.commit()
return new_subscription
@@ -101,54 +81,111 @@ class Subscription:
logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}',
exc_info=True)
- def add_network_function_to_subscription(self, nf):
+ def update_subscription_status(self):
+ """ Updates the status of subscription in subscription table """
+ try:
+ SubscriptionModel.query.filter(
+ SubscriptionModel.subscription_name == self.subscriptionName)\
+ .update({SubscriptionModel.status: self.administrativeState},
+ synchronize_session='evaluate')
+
+ db.session.commit()
+ except Exception as e:
+ logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}',
+ exc_info=True)
+
+ def delete_subscription(self):
+ """ Deletes a subscription and all its association from the database. A network function
+ that is only associated with the subscription being removed will also be deleted."""
+ try:
+ subscription = SubscriptionModel.query.filter(
+ SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+ if subscription:
+ for nf_relationship in subscription.nfs:
+ other_nf_relationship = NfSubRelationalModel.query.filter(
+ NfSubRelationalModel.subscription_name != self.subscriptionName,
+ NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none()
+ if not other_nf_relationship:
+ db.session.delete(nf_relationship.nf)
+ db.session.delete(subscription)
+ db.session.commit()
+ except Exception as e:
+ logger.error(f'Failed to delete subscription: {self.subscriptionName} '
+ f'and it\'s relations from the DB: {e}', exc_info=True)
+
+ def prepare_subscription_event(self, xnf_name, app_conf):
+ """Prepare the sub event for publishing
+
+ Args:
+ xnf_name: the AAI xnf name.
+ app_conf (AppConfig): the application configuration.
+
+ Returns:
+ dict: the Subscription event to be published.
+ """
+ try:
+ clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
+ sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
+ 'changeType': 'DELETE'
+ if self.administrativeState == AdministrativeState.LOCKED.value
+ else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
+ 'subscription': clean_sub}
+ return sub_event
+ except Exception as e:
+ logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True)
+ raise
+
+ def add_network_function_to_subscription(self, nf, sub_model):
""" Associates a network function to a Subscription
Args:
- nf : A NetworkFunction object.
+ sub_model(SubscriptionModel): The SubscriptionModel from the DB.
+ nf(NetworkFunction): A NetworkFunction object.
"""
- current_sub = self.create()
try:
current_nf = nf.create()
- logger.debug(f'Adding network function {nf.nf_name} to Subscription '
- f'{current_sub.subscription_name}')
existing_entry = NfSubRelationalModel.query.filter(
- NfSubRelationalModel.subscription_name == current_sub.subscription_name,
+ NfSubRelationalModel.subscription_name == self.subscriptionName,
NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none()
if existing_entry is None:
- new_nf_sub = NfSubRelationalModel(current_sub.subscription_name,
+ new_nf_sub = NfSubRelationalModel(self.subscriptionName,
nf.nf_name, SubNfState.PENDING_CREATE.value)
- new_nf_sub.nf = current_nf
- current_sub.nfs.append(new_nf_sub)
- logger.debug(f'Network function {current_nf.nf_name} added to Subscription '
- f'{current_sub.subscription_name}')
- db.session.add(current_sub)
+ sub_model.nfs.append(new_nf_sub)
+ db.session.add(sub_model)
db.session.commit()
+ logger.info(f'Network function {current_nf.nf_name} added to Subscription '
+ f'{self.subscriptionName}')
except Exception as e:
logger.error(f'Failed to add nf {nf.nf_name} to subscription '
- f'{current_sub.subscription_name}: {e}', exc_info=True)
- logger.debug(f'Subscription {current_sub.subscription_name} now contains these XNFs:'
- f'{Subscription.get_nf_names_per_sub(current_sub.subscription_name)}')
+ f'{self.subscriptionName}: {e}', exc_info=True)
+ logger.debug(f'Subscription {self.subscriptionName} now contains these XNFs:'
+ f'{Subscription.get_nf_names_per_sub(self.subscriptionName)}')
- @staticmethod
- def get(subscription_name):
- """ Retrieves a subscription
-
- Args:
- subscription_name (str): The subscription name
+ def get(self):
+ """ Retrieves a SubscriptionModel object
Returns:
- Subscription object else None
+ SubscriptionModel object else None
"""
return SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == subscription_name).one_or_none()
+ SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+
+ def get_local_sub_admin_state(self):
+ """ Retrieves the subscription admin state
+
+ Returns:
+ str: The admin state of the SubscriptionModel
+ """
+ sub_model = SubscriptionModel.query.filter(
+ SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+ return sub_model.status
@staticmethod
def get_all():
""" Retrieves a list of subscriptions
Returns:
- list: Subscription list else empty
+ list(SubscriptionModel): Subscriptions list else empty
"""
return SubscriptionModel.query.all()
@@ -160,7 +197,7 @@ class Subscription:
subscription_name (str): The subscription name
Returns:
- list: List of network function names
+ list(str): List of network function names
"""
nf_sub_rel = NfSubRelationalModel.query.filter(
NfSubRelationalModel.subscription_name == subscription_name).all()
@@ -170,65 +207,41 @@ class Subscription:
return list_of_nfs
- def update_subscription_status(self):
- """ Updates the status of subscription in subscription table """
- try:
- SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == self.subscriptionName)\
- .update({SubscriptionModel.status: self.administrativeState},
- synchronize_session='evaluate')
-
- db.session.commit()
- except Exception as e:
- logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}',
- exc_info=True)
-
- def delete_subscription(self):
- """ Deletes a subscription and all its association from the database. A network function
- that is only associated with the subscription being removed will also be deleted."""
- try:
- subscription = SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
- if subscription:
- for nf_relationship in subscription.nfs:
- other_nf_relationship = NfSubRelationalModel.query.filter(
- NfSubRelationalModel.subscription_name != self.subscriptionName,
- NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none()
- if not other_nf_relationship:
- db.session.delete(nf_relationship.nf)
- db.session.delete(subscription)
- db.session.commit()
- except Exception as e:
- logger.error(f'Failed to delete subscription: {self.subscriptionName} '
- f'and it\'s relations from the DB: {e}', exc_info=True)
-
- def process_subscription(self, nfs, mr_pub, app_conf):
- action = 'Deactivate'
- sub_nf_state = SubNfState.PENDING_DELETE.value
- self.update_subscription_status()
-
- if self.administrativeState == AdministrativeState.UNLOCKED.value:
- action = 'Activate'
- sub_nf_state = SubNfState.PENDING_CREATE.value
- logger.info(f'{action} subscription initiated for {self.subscriptionName}.')
-
+ def activate_subscription(self, nfs, mr_pub, app_conf):
+ logger.info(f'Activate subscription initiated for {self.subscriptionName}.')
try:
+ sub_model = self.get()
for nf in nfs:
mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
- logger.debug(f'Publishing Event to {action} '
- f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
- if action == 'Activate':
- self.add_network_function_to_subscription(nf)
- self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name)
+ logger.info(f'Publishing event to activate '
+ f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+ self.add_network_function_to_subscription(nf, sub_model)
+ self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value,
+ nf.nf_name)
except Exception as err:
raise Exception(f'Error publishing activation event to MR: {err}')
+ def deactivate_subscription(self, mr_pub, app_conf):
+ nfs = self.get_network_functions()
+ try:
+ if nfs:
+ logger.info(f'Deactivate subscription initiated for {self.subscriptionName}.')
+ for nf in nfs:
+ mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
+ logger.debug(f'Publishing Event to deactivate '
+ f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+ self.update_sub_nf_status(self.subscriptionName,
+ SubNfState.PENDING_DELETE.value,
+ nf.nf_name)
+ except Exception as err:
+ raise Exception(f'Error publishing deactivation event to MR: {err}')
+
@staticmethod
def get_all_nfs_subscription_relations():
""" Retrieves all network function to subscription relations
Returns:
- list: NetworkFunctions per Subscription list else empty
+ list(NfSubRelationalModel): NetworkFunctions per Subscription list else empty
"""
nf_per_subscriptions = NfSubRelationalModel.query.all()
return nf_per_subscriptions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
index 74b6ac88..e74a1732 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
@@ -22,14 +22,12 @@ from mod.subscription import AdministrativeState
class SubscriptionHandler:
- def __init__(self, administrative_state, mr_pub, app, app_conf, aai_event_thread):
- self.current_nfs = None
- self.current_sub = None
- self.administrative_state = administrative_state
+ def __init__(self, mr_pub, app, app_conf, aai_event_thread, policy_event_thread):
self.mr_pub = mr_pub
self.app = app
self.app_conf = app_conf
self.aai_event_thread = aai_event_thread
+ self.policy_event_thread = policy_event_thread
def execute(self):
"""
@@ -37,25 +35,37 @@ class SubscriptionHandler:
the Subscription if a change has occurred
"""
self.app.app_context().push()
+ local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
new_administrative_state = self.app_conf.subscription.administrativeState
try:
- if self.administrative_state == new_administrative_state:
+ if local_admin_state == new_administrative_state:
logger.info('Administrative State did not change in the Config')
else:
- self.current_nfs = aai.get_pmsh_nfs_from_aai(self.app_conf)
- self.current_sub = self.app_conf.subscription
- logger.info(f'Administrative State has changed from {self.administrative_state} '
- f'to {new_administrative_state}.')
- self.administrative_state = new_administrative_state
- self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf)
-
if new_administrative_state == AdministrativeState.UNLOCKED.value:
- logger.info('Listening to AAI-EVENT topic in MR.')
- self.aai_event_thread.start()
+ self._activate(local_admin_state, new_administrative_state)
+ elif local_admin_state == AdministrativeState.PENDING.value:
+ logger.info('Administrative State is PENDING')
else:
- logger.info('Stop listening to AAI-EVENT topic in MR.')
- self.aai_event_thread.cancel()
-
+ self._deactivate(local_admin_state, new_administrative_state)
except Exception as err:
logger.error(f'Error occurred during the activation/deactivation process {err}',
exc_info=True)
+
+ def _activate(self, local_admin_state, new_administrative_state):
+ logger.info(f'Administrative State has changed from {local_admin_state} '
+ f'to {new_administrative_state}.')
+ existing_nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf)
+ self.app_conf.subscription.activate_subscription(existing_nfs_in_aai, self.mr_pub,
+ self.app_conf)
+ self.app_conf.subscription.update_subscription_status()
+ logger.info('Start listening for new NFs on AAI-EVENT topic in MR.')
+ self.aai_event_thread.start()
+ self.policy_event_thread.start()
+
+ def _deactivate(self, local_admin_state, new_administrative_state):
+ logger.info(f'Administrative State has changed from {local_admin_state} '
+ f'to {new_administrative_state}.')
+ self.aai_event_thread.cancel()
+ logger.info('Stop listening for NFs on AAI-EVENT topic in MR.')
+ self.app_conf.subscription.deactivate_subscription(self.mr_pub, self.app_conf)
+ self.app_conf.subscription.update_subscription_status()
diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
index b3c906d0..6b6b9baa 100755
--- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
+++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
@@ -23,7 +23,6 @@ from mod.aai_event_handler import process_aai_events
from mod.exit_handler import ExitHandler
from mod.pmsh_utils import AppConfig, PeriodicTask
from mod.policy_response_handler import PolicyResponseHandler
-from mod.subscription import Subscription, AdministrativeState
from mod.subscription_handler import SubscriptionHandler
@@ -40,27 +39,27 @@ def main():
except Exception as e:
logger.error(f'Failed to get config and create application: {e}', exc_info=True)
sys.exit(e)
- subscription_in_db = Subscription.get(app_conf.subscription.subscriptionName)
- administrative_state = subscription_in_db.status if subscription_in_db \
- else AdministrativeState.LOCKED.value
app_conf_thread = PeriodicTask(10, app_conf.refresh_config)
app_conf_thread.start()
- aai_event_thread = PeriodicTask(10, process_aai_events,
- args=(aai_event_mr_sub, policy_mr_pub, app, app_conf))
- subscription_handler = SubscriptionHandler(administrative_state,
- policy_mr_pub, app, app_conf, aai_event_thread)
+
policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app)
+ policy_response_handler_thread = PeriodicTask(25, policy_response_handler.poll_policy_topic)
+
+ aai_event_thread = PeriodicTask(20, process_aai_events,
+ args=(aai_event_mr_sub, policy_mr_pub, app, app_conf))
+
+ subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread,
+ policy_response_handler_thread)
subscription_handler_thread = PeriodicTask(30, subscription_handler.execute)
- policy_response_handler_thread = PeriodicTask(10, policy_response_handler.poll_policy_topic)
subscription_handler_thread.start()
- policy_response_handler_thread.start()
+
periodic_tasks = [app_conf_thread, aai_event_thread, subscription_handler_thread,
policy_response_handler_thread]
signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks,
- subscription_handler=subscription_handler))
+ app_conf=app_conf, subscription_handler=subscription_handler))
launch_api_server(app_conf)
except Exception as e: