diff options
author | ERIMROB <robertas.rimkus@est.tech> | 2020-02-12 11:35:20 +0000 |
---|---|---|
committer | ERIMROB <robertas.rimkus@est.tech> | 2020-02-20 16:03:48 +0000 |
commit | 26b76c02052269ea850d8d4efd6deb536115a0af (patch) | |
tree | f7485d7ccd0e7d95c000b9c05bce2c371c34581a /components/pm-subscription-handler/pmsh_service/mod | |
parent | d42ac06c733c43e19a01b4203c1b987b4973ccfd (diff) |
Add Support for Activation and Deactivation
* Add support for reconfiguration of the administrativeState field
* Add support for policy feedback handling
* Fix network function filter applying to non active network functions
Signed-off-by: ERIMROB <robertas.rimkus@est.tech>
Change-Id: Ic1cfc3207b2495c1d8d10acd0ed1c40114cf4643
Issue-ID: DCAEGEN2-1830
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod')
6 files changed, 176 insertions, 28 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index 747846f1..f0f20566 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -119,10 +119,12 @@ def _filter_nf_data(nf_data, nf_filter): try: for nf in nf_data['results']: name_identifier = 'pnf-name' if nf['node-type'] == 'pnf' else 'vnf-name' - if nf_filter.is_nf_in_filter(nf['properties'].get(name_identifier)): + orchestration_status = nf['properties'].get('orchestration-status') + if nf_filter.is_nf_in_filter(nf['properties'].get(name_identifier)) \ + and orchestration_status == 'Active': nf_set.add(NetworkFunction( nf_name=nf['properties'].get(name_identifier), - orchestration_status=nf['properties'].get('orchestration-status'))) + orchestration_status=orchestration_status)) except KeyError as e: logger.debug(f'Failed to parse AAI data: {e}') raise diff --git a/components/pm-subscription-handler/pmsh_service/mod/config_handler.py b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py index 1ce4b701..acf5b76f 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/config_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/config_handler.py @@ -15,12 +15,10 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== - -import json from os import environ import requests -from tenacity import retry, wait_fixed, stop_after_attempt +from tenacity import retry, wait_fixed, stop_after_attempt, retry_if_exception_type import mod.pmsh_logging as logger @@ -45,7 +43,7 @@ class ConfigHandler: def hostname(self): return _get_environment_variable('HOSTNAME') - @retry(wait=wait_fixed(2), stop=stop_after_attempt(5)) + @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) def get_config(self): """ Retrieves PMSH's configuration from Configbinding service. If a non-2xx response is received, it retries after 2 seconds for 5 times before raising an exception. @@ -56,18 +54,15 @@ class ConfigHandler: Raises: Exception: If any error occurred pulling configuration from Configbinding service. """ - if self._config is None: - logger.debug('No configuration found, pulling from Configbinding Service.') - try: - response = requests.get(self.cbs_url) - response.raise_for_status() - self._config = response.json() - logger.debug(f'PMSH Configuration from Configbinding Service: {self._config}') - return json.loads(self._config) - except Exception as err: - raise Exception(f'Error retrieving configuration from CBS: {err}') - else: + + try: + response = requests.get(self.cbs_url) + response.raise_for_status() + self._config = response.json() + logger.debug(f'PMSH Configuration from Configbinding Service: {self._config}') return self._config + except Exception as err: + raise Exception(f'Error retrieving configuration from CBS: {err}') def _get_environment_variable(env_var_key): diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py index 64f614af..9f21cc66 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py +++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py @@ -15,16 +15,13 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== - from mod import pmsh_logging as logger, db from mod.db_models import NetworkFunctionModel class NetworkFunction: def __init__(self, **kwargs): - """ - Object representation of the NetworkFunction. - """ + """ Object representation of the NetworkFunction. """ self.nf_name = kwargs.get('nf_name') self.orchestration_status = kwargs.get('orchestration_status') @@ -36,8 +33,7 @@ class NetworkFunction: return f'nf-name: {self.nf_name}, orchestration-status: {self.orchestration_status}' def create(self): - """ Creates a NetworkFunction database entry - """ + """ Creates a NetworkFunction database entry """ existing_nf = NetworkFunctionModel.query.filter( NetworkFunctionModel.nf_name == self.nf_name).one_or_none() @@ -71,3 +67,13 @@ class NetworkFunction: list: NetworkFunctionModel objects else empty """ return NetworkFunctionModel.query.all() + + @staticmethod + def delete(**kwargs): + """ Deletes a network function from the database """ + nf_name = kwargs['nf_name'] + NetworkFunctionModel.query.filter( + NetworkFunctionModel.nf_name == nf_name). \ + delete(synchronize_session='evaluate') + + db.session.commit() diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py index f2d11d49..885644b4 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_logging.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019 Nordix Foundation. +# Copyright (C) 2019-2020 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,11 +15,10 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== - +import datetime import logging as log from logging.handlers import RotatingFileHandler from os import makedirs -import datetime # These loggers will be overwritten with EELF logging when running in Docker _AUDIT_LOGGER = log.getLogger("defaultlogger") diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index b665691d..4a77543b 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -15,12 +15,17 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== +import json +import threading import uuid import requests from requests.auth import HTTPBasicAuth +from tenacity import retry, wait_fixed, retry_if_exception_type import mod.pmsh_logging as logger +from mod.subscription import Subscription, SubNfState, AdministrativeState +from mod.network_function import NetworkFunction class AppConfig: @@ -168,3 +173,71 @@ class _MrSub(_DmaapMrClient): except Exception as e: logger.debug(e) return topic_data + + @staticmethod + def _handle_response(subscription_name, administrative_state, nf_name, response_message): + """ + Handles the response from Policy, updating the DB + + Args: + subscription_name (str): The subscription name + administrative_state (str): The administrative state of the subscription + nf_name (str): The network function name + response_message (str): The message in the response regarding the state (success|failed) + """ + logger.debug(f'Response from MR: Sub: {subscription_name} for ' + f'NF: {nf_name} received, updating the DB') + try: + sub_nf_status = subscription_nf_states[administrative_state][response_message].value + policy_response_handle_functions[administrative_state][response_message]( + subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name) + except Exception as err: + raise Exception(f'Error changing nf_sub status in the DB: {err}') + + @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception)) + def poll_policy_topic(self, subscription_name, app): + """ + This method polls MR for response from policy. It checks whether the message is for the + relevant subscription and then handles the response + + Args: + subscription_name (str): The subscription name + app (app): Needed to push context for the db + """ + app.app_context().push() + administrative_state = Subscription.get(subscription_name).status + try: + response_data = self.get_from_topic('policy_response_consumer') + for data in response_data: + data = json.loads(data) + if data['status']['subscriptionName'] == subscription_name: + nf_name = data['status']['nfName'] + response_message = data['status']['message'] + self._handle_response(subscription_name, administrative_state, + nf_name, response_message) + threading.Timer(5, self.poll_policy_topic, [subscription_name, app]).start() + except Exception as err: + raise Exception(f'Error trying to poll MR: {err}') + + +subscription_nf_states = { + AdministrativeState.LOCKED.value: { + 'success': SubNfState.CREATED, + 'failed': SubNfState.DELETE_FAILED + }, + AdministrativeState.UNLOCKED.value: { + 'success': SubNfState.CREATED, + 'failed': SubNfState.CREATE_FAILED + } +} + +policy_response_handle_functions = { + AdministrativeState.LOCKED.value: { + 'success': NetworkFunction.delete, + 'failed': Subscription.update_sub_nf_status + }, + AdministrativeState.UNLOCKED.value: { + 'success': Subscription.update_sub_nf_status, + 'failed': Subscription.update_sub_nf_status + } +} diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index 265d90b8..031609aa 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -16,10 +16,25 @@ # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== import re +from enum import Enum import mod.pmsh_logging as logger from mod import db from mod.db_models import SubscriptionModel, NfSubRelationalModel +from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt + + +class SubNfState(Enum): + PENDING_CREATE = 'PENDING_CREATE' + CREATE_FAILED = 'CREATE_FAILED' + CREATED = 'CREATED' + PENDING_DELETE = 'PENDING_DELETE' + DELETE_FAILED = 'DELETE_FAILED' + + +class AdministrativeState(Enum): + UNLOCKED = 'UNLOCKED' + LOCKED = 'LOCKED' class Subscription: @@ -42,7 +57,10 @@ class Subscription: dict: the Subscription event to be published. """ clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} - clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}'}) + clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}', + 'changeType': 'DELETE' + if self.administrativeState == AdministrativeState.LOCKED.value + else 'CREATE'}) return clean_sub def create(self): @@ -84,7 +102,8 @@ class Subscription: NfSubRelationalModel.subscription_name == current_sub.subscription_name, NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none() if existing_entry is None: - new_nf_sub = NfSubRelationalModel(current_sub.subscription_name, nf.nf_name) + new_nf_sub = NfSubRelationalModel(current_sub.subscription_name, + nf.nf_name, SubNfState.PENDING_CREATE.value) new_nf_sub.nf = current_nf logger.debug(current_nf) current_sub.nfs.append(new_nf_sub) @@ -114,6 +133,44 @@ class Subscription: """ return SubscriptionModel.query.all() + def update_subscription_status(self): + """ Updates the status of subscription in subscription table """ + SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName). \ + update({SubscriptionModel.status: self.administrativeState}, + synchronize_session='evaluate') + + db.session.commit() + + def delete_subscription(self): + """ Deletes a subscription from the database """ + SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName). \ + delete(synchronize_session='evaluate') + + db.session.commit() + + @retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3), + retry=retry_if_exception_type(Exception)) + def process_subscription(self, nfs, mr_pub): + action = 'Deactivate' + sub_nf_state = SubNfState.PENDING_DELETE.value + self.update_subscription_status() + + if self.administrativeState == AdministrativeState.UNLOCKED.value: + action = 'Activate' + sub_nf_state = SubNfState.PENDING_CREATE.value + + try: + for nf in nfs: + mr_pub.publish_subscription_event_data(self, nf.nf_name) + logger.debug(f'Publishing Event to {action} ' + f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') + self.add_network_functions_to_subscription(nfs) + self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name) + except Exception as err: + raise Exception(f'Error publishing activation event to MR: {err}') + @staticmethod def get_all_nfs_subscription_relations(): """ Retrieves all network function to subscription relations @@ -125,6 +182,22 @@ class Subscription: return nf_per_subscriptions + @staticmethod + def update_sub_nf_status(subscription_name, status, nf_name): + """ Updates the status of the subscription for a particular nf + + Args: + subscription_name (str): The subscription name + nf_name (str): The network function name + status (str): Status of the subscription + """ + NfSubRelationalModel.query.filter( + NfSubRelationalModel.subscription_name == subscription_name, + NfSubRelationalModel.nf_name == nf_name). \ + update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate') + + db.session.commit() + class NetworkFunctionFilter: def __init__(self, **kwargs): |