From 26b76c02052269ea850d8d4efd6deb536115a0af Mon Sep 17 00:00:00 2001 From: ERIMROB Date: Wed, 12 Feb 2020 11:35:20 +0000 Subject: Add Support for Activation and Deactivation * Add support for reconfiguration of the administrativeState field * Add support for policy feedback handling * Fix network function filter applying to non active network functions Signed-off-by: ERIMROB Change-Id: Ic1cfc3207b2495c1d8d10acd0ed1c40114cf4643 Issue-ID: DCAEGEN2-1830 --- .../pmsh_service/mod/aai_client.py | 6 +- .../pmsh_service/mod/config_handler.py | 25 +++--- .../pmsh_service/mod/network_function.py | 18 +++-- .../pmsh_service/mod/pmsh_logging.py | 5 +- .../pmsh_service/mod/pmsh_utils.py | 73 +++++++++++++++++ .../pmsh_service/mod/subscription.py | 77 +++++++++++++++++- .../pmsh_service/pmsh_service.py | 49 ----------- .../pmsh_service/pmsh_service_main.py | 94 ++++++++++++++++++++++ 8 files changed, 270 insertions(+), 77 deletions(-) delete mode 100755 components/pm-subscription-handler/pmsh_service/pmsh_service.py create mode 100755 components/pm-subscription-handler/pmsh_service/pmsh_service_main.py (limited to 'components/pm-subscription-handler/pmsh_service') diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index 747846f1..f0f20566 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -119,10 +119,12 @@ def _filter_nf_data(nf_data, nf_filter): try: for nf in nf_data['results']: name_identifier = 'pnf-name' if nf['node-type'] == 'pnf' else 'vnf-name' - if nf_filter.is_nf_in_filter(nf['properties'].get(name_identifier)): + orchestration_status = nf['properties'].get('orchestration-status') + if nf_filter.is_nf_in_filter(nf['properties'].get(name_identifier)) \ + and orchestration_status == 'Active': nf_set.add(NetworkFunction( nf_name=nf['properties'].get(name_identifier), - orchestration_status=nf['properties'].get('orchestration-status'))) + orchestration_status=orchestration_status)) except KeyError as e: logger.debug(f'Failed to parse AAI data: {e}') raise diff --git a/components/pm-subscription-handler/pmsh_service/mod/config_handler.py b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py index 1ce4b701..acf5b76f 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/config_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py @@ -15,12 +15,10 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== - -import json from os import environ import requests -from tenacity import retry, wait_fixed, stop_after_attempt +from tenacity import retry, wait_fixed, stop_after_attempt, retry_if_exception_type import mod.pmsh_logging as logger @@ -45,7 +43,7 @@ class ConfigHandler: def hostname(self): return _get_environment_variable('HOSTNAME') - @retry(wait=wait_fixed(2), stop=stop_after_attempt(5)) + @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) def get_config(self): """ Retrieves PMSH's configuration from Configbinding service. If a non-2xx response is received, it retries after 2 seconds for 5 times before raising an exception. @@ -56,18 +54,15 @@ class ConfigHandler: Raises: Exception: If any error occurred pulling configuration from Configbinding service. """ - if self._config is None: - logger.debug('No configuration found, pulling from Configbinding Service.') - try: - response = requests.get(self.cbs_url) - response.raise_for_status() - self._config = response.json() - logger.debug(f'PMSH Configuration from Configbinding Service: {self._config}') - return json.loads(self._config) - except Exception as err: - raise Exception(f'Error retrieving configuration from CBS: {err}') - else: + + try: + response = requests.get(self.cbs_url) + response.raise_for_status() + self._config = response.json() + logger.debug(f'PMSH Configuration from Configbinding Service: {self._config}') return self._config + except Exception as err: + raise Exception(f'Error retrieving configuration from CBS: {err}') def _get_environment_variable(env_var_key): diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py index 64f614af..9f21cc66 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py +++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py @@ -15,16 +15,13 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== - from mod import pmsh_logging as logger, db from mod.db_models import NetworkFunctionModel class NetworkFunction: def __init__(self, **kwargs): - """ - Object representation of the NetworkFunction. - """ + """ Object representation of the NetworkFunction. """ self.nf_name = kwargs.get('nf_name') self.orchestration_status = kwargs.get('orchestration_status') @@ -36,8 +33,7 @@ class NetworkFunction: return f'nf-name: {self.nf_name}, orchestration-status: {self.orchestration_status}' def create(self): - """ Creates a NetworkFunction database entry - """ + """ Creates a NetworkFunction database entry """ existing_nf = NetworkFunctionModel.query.filter( NetworkFunctionModel.nf_name == self.nf_name).one_or_none() @@ -71,3 +67,13 @@ class NetworkFunction: list: NetworkFunctionModel objects else empty """ return NetworkFunctionModel.query.all() + + @staticmethod + def delete(**kwargs): + """ Deletes a network function from the database """ + nf_name = kwargs['nf_name'] + NetworkFunctionModel.query.filter( + NetworkFunctionModel.nf_name == nf_name). \ + delete(synchronize_session='evaluate') + + db.session.commit() diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py index f2d11d49..885644b4 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019 Nordix Foundation. +# Copyright (C) 2019-2020 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,11 +15,10 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== - +import datetime import logging as log from logging.handlers import RotatingFileHandler from os import makedirs -import datetime # These loggers will be overwritten with EELF logging when running in Docker _AUDIT_LOGGER = log.getLogger("defaultlogger") diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index b665691d..4a77543b 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -15,12 +15,17 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== +import json +import threading import uuid import requests from requests.auth import HTTPBasicAuth +from tenacity import retry, wait_fixed, retry_if_exception_type import mod.pmsh_logging as logger +from mod.subscription import Subscription, SubNfState, AdministrativeState +from mod.network_function import NetworkFunction class AppConfig: @@ -168,3 +173,71 @@ class _MrSub(_DmaapMrClient): except Exception as e: logger.debug(e) return topic_data + + @staticmethod + def _handle_response(subscription_name, administrative_state, nf_name, response_message): + """ + Handles the response from Policy, updating the DB + + Args: + subscription_name (str): The subscription name + administrative_state (str): The administrative state of the subscription + nf_name (str): The network function name + response_message (str): The message in the response regarding the state (success|failed) + """ + logger.debug(f'Response from MR: Sub: {subscription_name} for ' + f'NF: {nf_name} received, updating the DB') + try: + sub_nf_status = subscription_nf_states[administrative_state][response_message].value + policy_response_handle_functions[administrative_state][response_message]( + subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name) + except Exception as err: + raise Exception(f'Error changing nf_sub status in the DB: {err}') + + @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception)) + def poll_policy_topic(self, subscription_name, app): + """ + This method polls MR for response from policy. It checks whether the message is for the + relevant subscription and then handles the response + + Args: + subscription_name (str): The subscription name + app (app): Needed to push context for the db + """ + app.app_context().push() + administrative_state = Subscription.get(subscription_name).status + try: + response_data = self.get_from_topic('policy_response_consumer') + for data in response_data: + data = json.loads(data) + if data['status']['subscriptionName'] == subscription_name: + nf_name = data['status']['nfName'] + response_message = data['status']['message'] + self._handle_response(subscription_name, administrative_state, + nf_name, response_message) + threading.Timer(5, self.poll_policy_topic, [subscription_name, app]).start() + except Exception as err: + raise Exception(f'Error trying to poll MR: {err}') + + +subscription_nf_states = { + AdministrativeState.LOCKED.value: { + 'success': SubNfState.CREATED, + 'failed': SubNfState.DELETE_FAILED + }, + AdministrativeState.UNLOCKED.value: { + 'success': SubNfState.CREATED, + 'failed': SubNfState.CREATE_FAILED + } +} + +policy_response_handle_functions = { + AdministrativeState.LOCKED.value: { + 'success': NetworkFunction.delete, + 'failed': Subscription.update_sub_nf_status + }, + AdministrativeState.UNLOCKED.value: { + 'success': Subscription.update_sub_nf_status, + 'failed': Subscription.update_sub_nf_status + } +} diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index 265d90b8..031609aa 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -16,10 +16,25 @@ # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== import re +from enum import Enum import mod.pmsh_logging as logger from mod import db from mod.db_models import SubscriptionModel, NfSubRelationalModel +from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt + + +class SubNfState(Enum): + PENDING_CREATE = 'PENDING_CREATE' + CREATE_FAILED = 'CREATE_FAILED' + CREATED = 'CREATED' + PENDING_DELETE = 'PENDING_DELETE' + DELETE_FAILED = 'DELETE_FAILED' + + +class AdministrativeState(Enum): + UNLOCKED = 'UNLOCKED' + LOCKED = 'LOCKED' class Subscription: @@ -42,7 +57,10 @@ class Subscription: dict: the Subscription event to be published. """ clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} - clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}'}) + clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}', + 'changeType': 'DELETE' + if self.administrativeState == AdministrativeState.LOCKED.value + else 'CREATE'}) return clean_sub def create(self): @@ -84,7 +102,8 @@ class Subscription: NfSubRelationalModel.subscription_name == current_sub.subscription_name, NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none() if existing_entry is None: - new_nf_sub = NfSubRelationalModel(current_sub.subscription_name, nf.nf_name) + new_nf_sub = NfSubRelationalModel(current_sub.subscription_name, + nf.nf_name, SubNfState.PENDING_CREATE.value) new_nf_sub.nf = current_nf logger.debug(current_nf) current_sub.nfs.append(new_nf_sub) @@ -114,6 +133,44 @@ class Subscription: """ return SubscriptionModel.query.all() + def update_subscription_status(self): + """ Updates the status of subscription in subscription table """ + SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName). \ + update({SubscriptionModel.status: self.administrativeState}, + synchronize_session='evaluate') + + db.session.commit() + + def delete_subscription(self): + """ Deletes a subscription from the database """ + SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName). \ + delete(synchronize_session='evaluate') + + db.session.commit() + + @retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3), + retry=retry_if_exception_type(Exception)) + def process_subscription(self, nfs, mr_pub): + action = 'Deactivate' + sub_nf_state = SubNfState.PENDING_DELETE.value + self.update_subscription_status() + + if self.administrativeState == AdministrativeState.UNLOCKED.value: + action = 'Activate' + sub_nf_state = SubNfState.PENDING_CREATE.value + + try: + for nf in nfs: + mr_pub.publish_subscription_event_data(self, nf.nf_name) + logger.debug(f'Publishing Event to {action} ' + f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') + self.add_network_functions_to_subscription(nfs) + self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name) + except Exception as err: + raise Exception(f'Error publishing activation event to MR: {err}') + @staticmethod def get_all_nfs_subscription_relations(): """ Retrieves all network function to subscription relations @@ -125,6 +182,22 @@ class Subscription: return nf_per_subscriptions + @staticmethod + def update_sub_nf_status(subscription_name, status, nf_name): + """ Updates the status of the subscription for a particular nf + + Args: + subscription_name (str): The subscription name + nf_name (str): The network function name + status (str): Status of the subscription + """ + NfSubRelationalModel.query.filter( + NfSubRelationalModel.subscription_name == subscription_name, + NfSubRelationalModel.nf_name == nf_name). \ + update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate') + + db.session.commit() + class NetworkFunctionFilter: def __init__(self, **kwargs): diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service.py b/components/pm-subscription-handler/pmsh_service/pmsh_service.py deleted file mode 100755 index c564a5e3..00000000 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service.py +++ /dev/null @@ -1,49 +0,0 @@ -# ============LICENSE_START=================================================== -# Copyright (C) 2019-2020 Nordix Foundation. -# ============================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END===================================================== -import sys -import time - -import mod.aai_client as aai_client -import mod.pmsh_logging as logger -from mod import db, create_app -from mod.config_handler import ConfigHandler -from mod.subscription import Subscription - - -def main(): - - try: - app = create_app() - app.app_context().push() - db.create_all(app=app) - - config_handler = ConfigHandler() - cbs_data = config_handler.get_config() - subscription, xnfs = aai_client.get_pmsh_subscription_data(cbs_data) - subscription.add_network_functions_to_subscription(xnfs) - except Exception as e: - logger.debug(f'Failed to Init PMSH: {e}') - sys.exit(e) - - while True: - logger.debug(Subscription.get_all_nfs_subscription_relations()) - time.sleep(5) - - -if __name__ == '__main__': - main() diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py new file mode 100755 index 00000000..ab330320 --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -0,0 +1,94 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2019-2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== +import sys +import time +import threading + +import mod.aai_client as aai +import mod.pmsh_logging as logger +from mod import db, create_app +from mod.config_handler import ConfigHandler +from mod.pmsh_utils import AppConfig +from mod.subscription import Subscription, AdministrativeState + + +def subscription_processor(config_handler, administrative_state, mr_pub, app): + """ + Checks for changes of administrative state in config and proceeds to process + the Subscription if a change has occurred + + Args: + config_handler (ConfigHandler): Configuration Handler used to get config + administrative_state (str): The administrative state + mr_pub (_MrPub): MR publisher + app (db): DB application + """ + app.app_context().push() + config = config_handler.get_config() + sub, nfs = aai.get_pmsh_subscription_data(config) + new_administrative_state = config['policy']['subscription']['administrativeState'] + polling_period = 30.0 + + try: + if administrative_state == new_administrative_state: + logger.debug('Administrative State did not change in the Config') + else: + sub.process_subscription(nfs, mr_pub) + + except Exception as err: + logger.debug(f'Error occurred during the activation/deactivation process {err}') + + threading.Timer(polling_period, subscription_processor, + [config_handler, new_administrative_state, mr_pub, app]).start() + + +def main(): + + try: + config_handler = ConfigHandler() + config = config_handler.get_config() + app_conf = AppConfig(**config['config']) + app = create_app() + app.app_context().push() + db.create_all(app=app) + sub, nfs = aai.get_pmsh_subscription_data(config) + mr_pub = app_conf.get_mr_pub('policy_pm_publisher') + mr_sub = app_conf.get_mr_sub('policy_pm_subscriber') + initial_start_delay = 5.0 + + administrative_state = AdministrativeState.LOCKED.value + subscription_in_db = Subscription.get(sub.subscriptionName) + if subscription_in_db is not None: + administrative_state = subscription_in_db.status + + threading.Timer(initial_start_delay, subscription_processor, + [config_handler, administrative_state, mr_pub, app]).start() + + threading.Timer(20.0, mr_sub.poll_policy_topic, [sub.subscriptionName, app]).start() + + except Exception as e: + logger.debug(f'Failed to Init PMSH: {e}') + sys.exit(e) + + while True: + logger.debug(Subscription.get_all_nfs_subscription_relations()) + time.sleep(5) + + +if __name__ == '__main__': + main() -- cgit 1.2.3-korg