From 80ff14860e3b8a7a2c29272c2c10c1e830c2141d Mon Sep 17 00:00:00 2001 From: efiacor Date: Tue, 9 Jun 2020 19:20:22 +0100 Subject: [PMSH] Improve CBS data handling # AppConfog object fetch priodically # AAI client to only fetch AAI data Signed-off-by: efiacor Change-Id: I78315f141c3bb7e8b0d9efa818d294415fa79918 Issue-ID: DCAEGEN2-2146 --- .../pmsh_service/mod/__init__.py | 4 +- .../pmsh_service/mod/aai_client.py | 29 +++++----- .../pmsh_service/mod/aai_event_handler.py | 21 +++---- .../pmsh_service/mod/network_function.py | 4 +- .../pmsh_service/mod/pmsh_utils.py | 65 +++++++++++++++++----- .../pmsh_service/mod/policy_response_handler.py | 15 ++--- .../pmsh_service/mod/subscription.py | 5 +- .../pmsh_service/mod/subscription_handler.py | 7 +-- .../pmsh_service/pmsh_service_main.py | 19 +++---- 9 files changed, 98 insertions(+), 71 deletions(-) (limited to 'components/pm-subscription-handler/pmsh_service') diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py index 316687c0..efc61aae 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py +++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py @@ -59,7 +59,7 @@ def create_app(): def create_logger(): config_file_path = os.getenv('LOGGER_CONFIG') - update_config(config_file_path) + update_logging_config(config_file_path) monkey.patch_loggingYaml() logging.config.yamlConfig(filepath=config_file_path, watchDog=os.getenv('DYNAMIC_LOGGER_CONFIG', True)) @@ -73,7 +73,7 @@ def create_logger(): logging.setLogRecordFactory(augment_record) -def update_config(config_file_path): +def update_logging_config(config_file_path): config_yaml = YAML() config_file = pathlib.Path(config_file_path) data = config_yaml.load(config_file) diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index 371fdb06..2b92df41 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -22,39 +22,39 @@ import requests from requests.auth import HTTPBasicAuth from mod import logger +from mod.network_function import NetworkFunction from mod.pmsh_utils import mdc_handler -from mod.network_function import NetworkFunction, NetworkFunctionFilter -from mod.subscription import Subscription -def get_pmsh_subscription_data(cbs_data): +def get_pmsh_nfs_from_aai(app_conf): """ - Returns the PMSH subscription data + Returns the Network Functions from AAI related to the Subscription. Args: - cbs_data: json app config from the Config Binding Service. + app_conf (AppConfig): the AppConfig object. Returns: - Subscription, set(NetworkFunctions): `Subscription` object, - set of NetworkFunctions to be added. + set(NetworkFunctions): set of NetworkFunctions. Raises: - RuntimeError: if AAI data cannot be retrieved. + RuntimeError: if AAI Network Function data cannot be retrieved. """ - aai_nf_data = _get_all_aai_nf_data() + aai_nf_data = _get_all_aai_nf_data(app_conf) if aai_nf_data: - sub = Subscription(**cbs_data['policy']['subscription']) - nfs = _filter_nf_data(aai_nf_data, NetworkFunctionFilter(**sub.nfFilter)) + nfs = _filter_nf_data(aai_nf_data, app_conf.nf_filter) else: raise RuntimeError('Failed to get data from AAI') - return sub, nfs + return nfs @mdc_handler -def _get_all_aai_nf_data(**kwargs): +def _get_all_aai_nf_data(app_conf, **kwargs): """ Return queried nf data from the AAI service. + Args: + app_conf (AppConfig): the AppConfig object. + Returns: dict: the json response from AAI query, else None. """ @@ -77,7 +77,8 @@ def _get_all_aai_nf_data(**kwargs): }""" params = {'format': 'simple', 'nodesOnly': 'true'} response = session.put(aai_endpoint, headers=headers, - auth=HTTPBasicAuth('AAI', 'AAI'), + auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'), + app_conf.aaf_creds.get('aaf_pass')), data=json_data, params=params, verify=False) response.raise_for_status() if response.ok: diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index 5aebb926..96f51431 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -20,8 +20,7 @@ import json from enum import Enum from mod import logger -from mod.network_function import NetworkFunction, NetworkFunctionFilter -from mod.subscription import AdministrativeState +from mod.network_function import NetworkFunction class XNFType(Enum): @@ -34,13 +33,12 @@ class AAIEvent(Enum): UPDATE = 'UPDATE' -def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf): +def process_aai_events(mr_sub, mr_pub, app, app_conf): """ Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active. Args: mr_sub (_MrSub): MR subscriber - subscription (Subscription): The current subscription object mr_pub (_MrPub): MR publisher app (db): DB application app_conf (AppConfig): the application configuration. @@ -48,7 +46,7 @@ def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf): app.app_context().push() aai_events = mr_sub.get_from_topic('AAI-EVENT') - if _aai_event_exists(aai_events): + if aai_events is not None and len(aai_events) != 0: for entry in aai_events: logger.debug(f'AAI-EVENT entry: {entry}') entry = json.loads(entry) @@ -60,19 +58,18 @@ def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf): 'vnf-name'] new_status = aai_xnf['orchestration-status'] - if NetworkFunctionFilter(**subscription.nfFilter).is_nf_in_filter(xnf_name, new_status): - _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf) + if app_conf.nf_filter.is_nf_in_filter(xnf_name, new_status): + _process_event(action, new_status, xnf_name, mr_pub, app_conf) -def _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf): +def _process_event(action, new_status, xnf_name, mr_pub, app_conf): if action == AAIEvent.UPDATE.value: logger.info(f'Update event found for network function {xnf_name}') local_xnf = NetworkFunction.get(xnf_name) if local_xnf is None: logger.info(f'Activating subscription for network function {xnf_name}') - subscription.administrativeState = AdministrativeState.UNLOCKED.value - subscription.process_subscription([NetworkFunction( + app_conf.subscription.process_subscription([NetworkFunction( nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf) else: logger.debug(f"Update Event for network function {xnf_name} will not be processed " @@ -81,7 +78,3 @@ def _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf) logger.info(f'Delete event found for network function {xnf_name}') NetworkFunction.delete(nf_name=xnf_name) logger.info(f'{xnf_name} successfully deleted.') - - -def _aai_event_exists(aai_events): - return aai_events is not None and len(aai_events) != 0 diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py index aa39bf2c..979cc775 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py +++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py @@ -102,8 +102,8 @@ class NetworkFunctionFilter: """Match the nf name against regex values in Subscription.nfFilter.nfNames Args: - nf_name: the AAI nf name. - orchestration_status: orchestration status of the nf + nf_name (str): the AAI nf name. + orchestration_status (str): orchestration status of the nf Returns: bool: True if matched, else False. diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 01661ad0..fb6a5194 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -15,6 +15,7 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== +import threading import uuid from os import getenv from threading import Timer @@ -26,6 +27,8 @@ from requests.auth import HTTPBasicAuth from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type from mod import logger +from mod.network_function import NetworkFunctionFilter +from mod.subscription import Subscription def mdc_handler(function): @@ -42,12 +45,40 @@ def mdc_handler(function): return decorator -class ConfigHandler: - """ Handles retrieval of PMSH's configuration from Configbinding service.""" - @staticmethod +class ThreadSafeSingleton(type): + _instances = {} + _singleton_lock = threading.Lock() + + def __call__(cls, *args, **kwargs): + # double-checked locking pattern (https://en.wikipedia.org/wiki/Double-checked_locking) + if cls not in cls._instances: + with cls._singleton_lock: + if cls not in cls._instances: + cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class AppConfig(metaclass=ThreadSafeSingleton): + + def __init__(self): + try: + conf = self._get_pmsh_config() + except Exception: + raise + self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'), + 'aaf_pass': conf['config'].get('aaf_password')} + self.cert_path = conf['config'].get('cert_path') + self.key_path = conf['config'].get('key_path') + self.streams_subscribes = conf['config'].get('streams_subscribes') + self.streams_publishes = conf['config'].get('streams_publishes') + self.operational_policy_name = conf['config'].get('operational_policy_name') + self.control_loop_name = conf['config'].get('control_loop_name') + self.subscription = Subscription(**conf['policy']['subscription']) + self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter) + @mdc_handler @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) - def get_pmsh_config(**kwargs): + def _get_pmsh_config(self, **kwargs): """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response is received, it retries after 2 seconds for 5 times before raising an exception. @@ -66,17 +97,23 @@ class ConfigHandler: logger.error(f'Failed to get config from CBS: {err}') raise Exception + def refresh_config(self): + """ + Update the relevant attributes of the AppConfig object. -class AppConfig: - def __init__(self, **kwargs): - self.aaf_creds = {'aaf_id': kwargs.get('aaf_identity'), - 'aaf_pass': kwargs.get('aaf_password')} - self.cert_path = kwargs.get('cert_path') - self.key_path = kwargs.get('key_path') - self.streams_subscribes = kwargs.get('streams_subscribes') - self.streams_publishes = kwargs.get('streams_publishes') - self.operational_policy_name = kwargs.get('operational_policy_name') - self.control_loop_name = kwargs.get('control_loop_name') + Raises: + Exception: if cbs request fails. + """ + try: + app_conf = self._get_pmsh_config() + except Exception: + logger.debug("Failed to refresh AppConfig data") + raise + self.subscription.administrativeState = \ + app_conf['policy']['subscription']['administrativeState'] + self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames'] + self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter']['swVersions'] + logger.info("AppConfig data has been refreshed") def get_mr_sub(self, sub_name): """ diff --git a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py index 5ce03691..2b917cec 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py @@ -37,9 +37,9 @@ policy_response_handle_functions = { class PolicyResponseHandler: - def __init__(self, mr_sub, subscription_name, app): + def __init__(self, mr_sub, app_conf, app): self.mr_sub = mr_sub - self.subscription_name = subscription_name + self.app_conf = app_conf self.app = app @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception)) @@ -48,18 +48,19 @@ class PolicyResponseHandler: This method polls MR for response from policy. It checks whether the message is for the relevant subscription and then handles the response """ - logger.info('Polling MR started for XNF activation/deactivation policy response events.') self.app.app_context().push() - administrative_state = Subscription.get(self.subscription_name).status + administrative_state = self.app_conf.subscription.administrativeState + logger.info('Polling MR started for XNF activation/deactivation policy response events.') try: response_data = self.mr_sub.get_from_topic('policy_response_consumer') for data in response_data: data = json.loads(data) - if data['status']['subscriptionName'] == self.subscription_name: + if data['status']['subscriptionName'] \ + == self.app_conf.subscription.subscriptionName: nf_name = data['status']['nfName'] response_message = data['status']['message'] - self._handle_response(self.subscription_name, administrative_state, - nf_name, response_message) + self._handle_response(self.app_conf.subscription.subscriptionName, + administrative_state, nf_name, response_message) except Exception as err: raise Exception(f'Error trying to poll policy response topic on MR: {err}') diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index be217b11..d6b17cd9 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -125,7 +125,7 @@ class Subscription: logger.debug(f'Failed to add nf {nf.nf_name} to subscription ' f'{current_sub.subscription_name}: {e}') logger.debug(f'Subscription {current_sub.subscription_name} now contains these XNFs:' - f'{Subscription.get_nfs_per_subscription(current_sub.subscription_name)}') + f'{Subscription.get_nf_names_per_sub(current_sub.subscription_name)}') @staticmethod def get(subscription_name): @@ -206,9 +206,9 @@ class Subscription: self.update_subscription_status() if self.administrativeState == AdministrativeState.UNLOCKED.value: - logger.info(f'{action} subscription initiated for {self.subscriptionName}.') action = 'Activate' sub_nf_state = SubNfState.PENDING_CREATE.value + logger.info(f'{action} subscription initiated for {self.subscriptionName}.') try: for nf in nfs: @@ -228,7 +228,6 @@ class Subscription: list: NetworkFunctions per Subscription list else empty """ nf_per_subscriptions = NfSubRelationalModel.query.all() - return nf_per_subscriptions @staticmethod diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index be67cae6..add8be42 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -17,7 +17,6 @@ # ============LICENSE_END===================================================== import mod.aai_client as aai -from mod.pmsh_utils import ConfigHandler from mod import logger from mod.subscription import AdministrativeState @@ -38,15 +37,15 @@ class SubscriptionHandler: the Subscription if a change has occurred """ self.app.app_context().push() - config = ConfigHandler.get_pmsh_config() - new_administrative_state = config['policy']['subscription']['administrativeState'] + new_administrative_state = self.app_conf.subscription.administrativeState try: if self.administrative_state == new_administrative_state: logger.info('Administrative State did not change in the Config') else: logger.info(f'Administrative State has changed from {self.administrative_state} ' f'to {new_administrative_state}.') - self.current_sub, self.current_nfs = aai.get_pmsh_subscription_data(config) + self.current_nfs = aai.get_pmsh_nfs_from_aai(self.app_conf) + self.current_sub = self.app_conf.subscription self.administrative_state = new_administrative_state self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf) diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index f1fb1e06..143b5c8c 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -18,11 +18,10 @@ import sys from signal import signal, SIGTERM -import mod.aai_client as aai from mod import db, create_app, launch_api_server, logger from mod.aai_event_handler import process_aai_events from mod.exit_handler import ExitHandler -from mod.pmsh_utils import AppConfig, PeriodicTask, ConfigHandler +from mod.pmsh_utils import AppConfig, PeriodicTask from mod.policy_response_handler import PolicyResponseHandler from mod.subscription import Subscription, AdministrativeState from mod.subscription_handler import SubscriptionHandler @@ -33,29 +32,27 @@ def main(): app = create_app() app.app_context().push() db.create_all(app=app) - config = ConfigHandler.get_pmsh_config() - app_conf = AppConfig(**config['config']) - - sub, nfs = aai.get_pmsh_subscription_data(config) + app_conf = AppConfig() policy_mr_pub = app_conf.get_mr_pub('policy_pm_publisher') policy_mr_sub = app_conf.get_mr_sub('policy_pm_subscriber') mr_aai_event_sub = app_conf.get_mr_sub('aai_subscriber') - subscription_in_db = Subscription.get(sub.subscriptionName) + subscription_in_db = Subscription.get(app_conf.subscription.subscriptionName) administrative_state = subscription_in_db.status if subscription_in_db \ else AdministrativeState.LOCKED.value + app_conf_thread = PeriodicTask(10, app_conf.refresh_config) + app_conf_thread.start() aai_event_thread = PeriodicTask(10, process_aai_events, - args=(mr_aai_event_sub, - sub, policy_mr_pub, app, app_conf)) + args=(mr_aai_event_sub, policy_mr_pub, app, app_conf)) subscription_handler = SubscriptionHandler(administrative_state, policy_mr_pub, app, app_conf, aai_event_thread) - policy_response_handler = PolicyResponseHandler(policy_mr_sub, sub.subscriptionName, app) + policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app) subscription_handler_thread = PeriodicTask(30, subscription_handler.execute) policy_response_handler_thread = PeriodicTask(5, policy_response_handler.poll_policy_topic) subscription_handler_thread.start() policy_response_handler_thread.start() - periodic_tasks = [aai_event_thread, subscription_handler_thread, + periodic_tasks = [app_conf_thread, aai_event_thread, subscription_handler_thread, policy_response_handler_thread] signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks, -- cgit 1.2.3-korg