diff options
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
9 files changed, 204 insertions, 160 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index f0f20566..489d035b 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -23,8 +23,8 @@ import requests from requests.auth import HTTPBasicAuth import mod.pmsh_logging as logger -from mod.network_function import NetworkFunction -from mod.subscription import Subscription, NetworkFunctionFilter +from mod.network_function import NetworkFunction, NetworkFunctionFilter +from mod.subscription import Subscription def get_pmsh_subscription_data(cbs_data): diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index 9d69e760..ee75fbf5 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -20,8 +20,7 @@ import json from enum import Enum from mod import pmsh_logging as logger -from mod.network_function import NetworkFunction -from mod.subscription import NetworkFunctionFilter +from mod.network_function import NetworkFunction, NetworkFunctionFilter class XNFType(Enum): diff --git a/components/pm-subscription-handler/pmsh_service/mod/config_handler.py b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py index acf5b76f..26b03153 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/config_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py @@ -15,6 +15,7 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== + from os import environ import requests diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py index 2150dc28..1cdf57a0 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py +++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py @@ -15,6 +15,8 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== + +import re from mod import pmsh_logging as logger, db from mod.db_models import NetworkFunctionModel @@ -85,3 +87,21 @@ class NetworkFunction: if nf: db.session.delete(nf) db.session.commit() + + +class NetworkFunctionFilter: + def __init__(self, **kwargs): + self.nf_sw_version = kwargs.get('swVersions') + self.nf_names = kwargs.get('nfNames') + self.regex_matcher = re.compile('|'.join(raw_regex for raw_regex in self.nf_names)) + + def is_nf_in_filter(self, nf_name): + """Match the nf name against regex values in Subscription.nfFilter.nfNames + + Args: + nf_name: the AAI nf name. + + Returns: + bool: True if matched, else False. + """ + return self.regex_matcher.search(nf_name) diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index c8b3bc77..1fc3a097 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -15,18 +15,12 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== -import json -import threading -import uuid -from threading import Timer +import uuid import requests -from requests.auth import HTTPBasicAuth -from tenacity import retry, wait_fixed, retry_if_exception_type - import mod.pmsh_logging as logger -from mod.network_function import NetworkFunction -from mod.subscription import Subscription, SubNfState, AdministrativeState +from requests.auth import HTTPBasicAuth +from threading import Timer class AppConfig: @@ -179,74 +173,6 @@ class _MrSub(_DmaapMrClient): logger.debug(e) return topic_data - @staticmethod - def _handle_response(subscription_name, administrative_state, nf_name, response_message): - """ - Handles the response from Policy, updating the DB - - Args: - subscription_name (str): The subscription name - administrative_state (str): The administrative state of the subscription - nf_name (str): The network function name - response_message (str): The message in the response regarding the state (success|failed) - """ - logger.debug(f'Response from MR: Sub: {subscription_name} for ' - f'NF: {nf_name} received, updating the DB') - try: - sub_nf_status = subscription_nf_states[administrative_state][response_message].value - policy_response_handle_functions[administrative_state][response_message]( - subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name) - except Exception as err: - raise Exception(f'Error changing nf_sub status in the DB: {err}') - - @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception)) - def poll_policy_topic(self, subscription_name, app): - """ - This method polls MR for response from policy. It checks whether the message is for the - relevant subscription and then handles the response - - Args: - subscription_name (str): The subscription name - app (app): Needed to push context for the db - """ - app.app_context().push() - administrative_state = Subscription.get(subscription_name).status - try: - response_data = self.get_from_topic('policy_response_consumer') - for data in response_data: - data = json.loads(data) - if data['status']['subscriptionName'] == subscription_name: - nf_name = data['status']['nfName'] - response_message = data['status']['message'] - self._handle_response(subscription_name, administrative_state, - nf_name, response_message) - threading.Timer(5, self.poll_policy_topic, [subscription_name, app]).start() - except Exception as err: - raise Exception(f'Error trying to poll MR: {err}') - - -subscription_nf_states = { - AdministrativeState.LOCKED.value: { - 'success': SubNfState.CREATED, - 'failed': SubNfState.DELETE_FAILED - }, - AdministrativeState.UNLOCKED.value: { - 'success': SubNfState.CREATED, - 'failed': SubNfState.CREATE_FAILED - } -} - -policy_response_handle_functions = { - AdministrativeState.LOCKED.value: { - 'success': NetworkFunction.delete, - 'failed': Subscription.update_sub_nf_status - }, - AdministrativeState.UNLOCKED.value: { - 'success': Subscription.update_sub_nf_status, - 'failed': Subscription.update_sub_nf_status - } -} - class PeriodicTask(Timer): """ diff --git a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py new file mode 100644 index 00000000..aa5a8cb8 --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py @@ -0,0 +1,84 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== + +import json + +from tenacity import retry, wait_fixed, retry_if_exception_type + +import mod.pmsh_logging as logger +from mod.network_function import NetworkFunction +from mod.subscription import Subscription, AdministrativeState, subscription_nf_states + + +policy_response_handle_functions = { + AdministrativeState.LOCKED.value: { + 'success': NetworkFunction.delete, + 'failed': Subscription.update_sub_nf_status + }, + AdministrativeState.UNLOCKED.value: { + 'success': Subscription.update_sub_nf_status, + 'failed': Subscription.update_sub_nf_status + } +} + + +class PolicyResponseHandler: + def __init__(self, mr_sub, subscription_name, app): + self.mr_sub = mr_sub + self.subscription_name = subscription_name + self.app = app + + @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception)) + def poll_policy_topic(self): + """ + This method polls MR for response from policy. It checks whether the message is for the + relevant subscription and then handles the response + """ + self.app.app_context().push() + administrative_state = Subscription.get(self.subscription_name).status + try: + response_data = self.mr_sub.get_from_topic('policy_response_consumer') + for data in response_data: + data = json.loads(data) + if data['status']['subscriptionName'] == self.subscription_name: + nf_name = data['status']['nfName'] + response_message = data['status']['message'] + self._handle_response(self.subscription_name, administrative_state, + nf_name, response_message) + except Exception as err: + raise Exception(f'Error trying to poll policy response topic on MR: {err}') + + @staticmethod + def _handle_response(subscription_name, administrative_state, nf_name, response_message): + """ + Handles the response from Policy, updating the DB + + Args: + subscription_name (str): The subscription name + administrative_state (str): The administrative state of the subscription + nf_name (str): The network function name + response_message (str): The message in the response regarding the state (success|failed) + """ + logger.debug(f'Response from MR: Sub: {subscription_name} for ' + f'NF: {nf_name} received, updating the DB') + try: + sub_nf_status = subscription_nf_states[administrative_state][response_message].value + policy_response_handle_functions[administrative_state][response_message]( + subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name) + except Exception as err: + raise Exception(f'Error changing nf_sub status in the DB: {err}') diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index 5449f420..99a787da 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -15,13 +15,14 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== -import re + from enum import Enum +from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt + import mod.pmsh_logging as logger from mod import db from mod.db_models import SubscriptionModel, NfSubRelationalModel -from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt class SubNfState(Enum): @@ -37,6 +38,18 @@ class AdministrativeState(Enum): LOCKED = 'LOCKED' +subscription_nf_states = { + AdministrativeState.LOCKED.value: { + 'success': SubNfState.CREATED, + 'failed': SubNfState.DELETE_FAILED + }, + AdministrativeState.UNLOCKED.value: { + 'success': SubNfState.CREATED, + 'failed': SubNfState.CREATE_FAILED + } +} + + class Subscription: def __init__(self, **kwargs): self.subscriptionName = kwargs.get('subscriptionName') @@ -205,21 +218,3 @@ class Subscription: update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate') db.session.commit() - - -class NetworkFunctionFilter: - def __init__(self, **kwargs): - self.nf_sw_version = kwargs.get('swVersions') - self.nf_names = kwargs.get('nfNames') - self.regex_matcher = re.compile('|'.join(raw_regex for raw_regex in self.nf_names)) - - def is_nf_in_filter(self, nf_name): - """Match the nf name against regex values in Subscription.nfFilter.nfNames - - Args: - nf_name: the AAI nf name. - - Returns: - bool: True if matched, else False. - """ - return self.regex_matcher.search(nf_name) diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py new file mode 100644 index 00000000..a615aa77 --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -0,0 +1,59 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== + +import mod.aai_client as aai +import mod.pmsh_logging as logger +from mod.subscription import AdministrativeState + + +class SubscriptionHandler: + def __init__(self, config_handler, administrative_state, mr_pub, + aai_event_thread, app, app_conf): + self.config_handler = config_handler + self.administrative_state = administrative_state + self.mr_pub = mr_pub + self.aai_event_thread = aai_event_thread + self.app = app + self.app_conf = app_conf + + def execute(self): + """ + Checks for changes of administrative state in config and proceeds to process + the Subscription if a change has occurred + """ + self.app.app_context().push() + config = self.config_handler.get_config() + new_administrative_state = config['policy']['subscription']['administrativeState'] + + try: + if self.administrative_state == new_administrative_state: + logger.debug('Administrative State did not change in the Config') + else: + sub, network_functions = aai.get_pmsh_subscription_data(config) + self.administrative_state = new_administrative_state + sub.process_subscription(network_functions, self.mr_pub, self.app_conf) + + if new_administrative_state == AdministrativeState.UNLOCKED.value: + logger.debug('Listening to AAI-EVENT topic in MR.') + self.aai_event_thread.start() + else: + logger.debug('Stop listening to AAI-EVENT topic in MR.') + self.aai_event_thread.cancel() + + except Exception as err: + logger.debug(f'Error occurred during the activation/deactivation process {err}') diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index 8245466b..af5aece2 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -15,61 +15,18 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== + import sys -import threading import mod.aai_client as aai import mod.pmsh_logging as logger from mod import db, create_app, launch_api_server -from mod.aai_event_handler import process_aai_events from mod.config_handler import ConfigHandler from mod.pmsh_utils import AppConfig, PeriodicTask +from mod.policy_response_handler import PolicyResponseHandler from mod.subscription import Subscription, AdministrativeState - - -def subscription_processor(config_handler, administrative_state, mr_pub, app, - mr_aai_event_subscriber): - """ - Checks for changes of administrative state in config and proceeds to process - the Subscription if a change has occurred - - Args: - config_handler (ConfigHandler): Configuration Handler used to get config - administrative_state (str): The administrative state - mr_pub (_MrPub): MR publisher - app (db): DB application - mr_aai_event_subscriber (_MrSub): AAI events MR subscriber - """ - app.app_context().push() - config = config_handler.get_config() - app_conf = AppConfig(**config['config']) - new_administrative_state = config['policy']['subscription']['administrativeState'] - polling_period = 30.0 - - try: - if administrative_state == new_administrative_state: - logger.debug('Administrative State did not change in the Config') - else: - logger.debug(f'Administrative State changed from "{administrative_state}" "to ' - f'"{new_administrative_state}".') - sub, nfs = aai.get_pmsh_subscription_data(config) - sub.process_subscription(nfs, mr_pub, app_conf) - aai_event_thread = PeriodicTask(10, process_aai_events, args=( - mr_aai_event_subscriber, sub, mr_pub, app, app_conf)) - - if new_administrative_state == AdministrativeState.UNLOCKED.value: - logger.debug('Listening to AAI-EVENT topic in MR.') - aai_event_thread.start() - else: - logger.debug('Stopping to listen to AAI-EVENT topic in MR.') - aai_event_thread.cancel() - - except Exception as err: - logger.debug(f'Error occurred during the activation/deactivation process {err}') - - threading.Timer(polling_period, subscription_processor, - [config_handler, new_administrative_state, mr_pub, app, - mr_aai_event_subscriber]).start() +from mod.subscription_handler import SubscriptionHandler +from mod.aai_event_handler import process_aai_events def main(): @@ -81,21 +38,24 @@ def main(): app.app_context().push() db.create_all(app=app) sub, nfs = aai.get_pmsh_subscription_data(config) - mr_pub = app_conf.get_mr_pub('policy_pm_publisher') - mr_sub = app_conf.get_mr_sub('policy_pm_subscriber') - mr_aai_event_subscriber = app_conf.get_mr_sub('aai_subscriber') - initial_start_delay = 5.0 - - administrative_state = AdministrativeState.LOCKED.value + policy_mr_pub = app_conf.get_mr_pub('policy_pm_publisher') + policy_mr_sub = app_conf.get_mr_sub('policy_pm_subscriber') + mr_aai_event_sub = app_conf.get_mr_sub('aai_subscriber') subscription_in_db = Subscription.get(sub.subscriptionName) - if subscription_in_db is not None: - administrative_state = subscription_in_db.status + administrative_state = subscription_in_db.status if subscription_in_db \ + else AdministrativeState.LOCKED.value + + aai_event_thread = PeriodicTask(10, process_aai_events, + args=(mr_aai_event_sub, sub, policy_mr_pub, app, app_conf)) + subscription_handler = SubscriptionHandler(config_handler, administrative_state, + policy_mr_pub, aai_event_thread, app, app_conf) + policy_response_handler = PolicyResponseHandler(policy_mr_sub, sub.subscriptionName, app) - threading.Timer(initial_start_delay, subscription_processor, - [config_handler, administrative_state, mr_pub, - app, mr_aai_event_subscriber]).start() + subscription_handler_thread = PeriodicTask(30, subscription_handler.execute) + policy_response_handler_thread = PeriodicTask(5, policy_response_handler.poll_policy_topic) - threading.Timer(20.0, mr_sub.poll_policy_topic, [sub.subscriptionName, app]).start() + subscription_handler_thread.start() + policy_response_handler_thread.start() launch_api_server(app_conf) |