diff options
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod')
5 files changed, 181 insertions, 33 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/db_models.py b/components/pm-subscription-handler/pmsh_service/mod/api/db_models.py index a9dd6efe..4501282e 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/api/db_models.py +++ b/components/pm-subscription-handler/pmsh_service/mod/api/db_models.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019-2020 Nordix Foundation. +# Copyright (C) 2019-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== -from sqlalchemy import Column, Integer, String, ForeignKey +from sqlalchemy import Column, Integer, String, ForeignKey, JSON from sqlalchemy.orm import relationship from mod import db @@ -26,6 +26,7 @@ class SubscriptionModel(db.Model): __tablename__ = 'subscriptions' id = Column(Integer, primary_key=True, autoincrement=True) subscription_name = Column(String(100), unique=True) + nfFilter = Column(JSON) status = Column(String(20)) nfs = relationship( @@ -33,12 +34,15 @@ class SubscriptionModel(db.Model): cascade='all, delete-orphan', backref='subscription') - def __init__(self, subscription_name, status): + def __init__(self, subscription_name, nfFilter, status): self.subscription_name = subscription_name + self.nfFilter = nfFilter self.status = status def __repr__(self): - return f'subscription_name: {self.subscription_name}, status: {self.status}' + return f'subscription_name: {self.subscription_name}, ' \ + f'nfFilter: {self.nfFilter}, ' \ + f'status: {self.status}' def __eq__(self, other): if isinstance(self, other.__class__): @@ -49,7 +53,9 @@ class SubscriptionModel(db.Model): sub_nfs = NfSubRelationalModel.query.filter( NfSubRelationalModel.subscription_name == self.subscription_name).all() db.session.remove() - return {'subscription_name': self.subscription_name, 'subscription_status': self.status, + return {'subscription_name': self.subscription_name, + 'nfFilter': self.nfFilter, + 'subscription_status': self.status, 'network_functions': [sub_nf.serialize_nf() for sub_nf in sub_nfs]} diff --git a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py index 09c97047..ec331791 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2020 Nordix Foundation. +# Copyright (C) 2020-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,6 +31,10 @@ policy_response_handle_functions = { 'success': Subscription.update_sub_nf_status, 'failed': Subscription.update_sub_nf_status }, + AdministrativeState.FILTERING.value: { + 'success': NetworkFunction.delete, + 'failed': Subscription.update_sub_nf_status + }, AdministrativeState.LOCKING.value: { 'success': NetworkFunction.delete, 'failed': Subscription.update_sub_nf_status @@ -38,6 +42,19 @@ policy_response_handle_functions = { } +def _set_sub_nf_status(administrative_state, response_message): + """ + + Args: + administrative_state (str): The administrative state of the subscription + response_message (str): The message in the response regarding the state (success|failed) + + Returns: + str: sub_nf_status + """ + return subscription_nf_states[administrative_state][response_message].value; + + class PolicyResponseHandler: def __init__(self, mr_sub, app_conf, app): self.mr_sub = mr_sub @@ -60,13 +77,14 @@ class PolicyResponseHandler: == self.app_conf.subscription.subscriptionName: nf_name = data['status']['nfName'] response_message = data['status']['message'] + change_type = data['status']['changeType'] self._handle_response(self.app_conf.subscription.subscriptionName, - administrative_state, nf_name, response_message) + administrative_state, nf_name, response_message, change_type) except Exception as err: logger.error(f'Error trying to poll policy response topic on MR: {err}', exc_info=True) @staticmethod - def _handle_response(subscription_name, administrative_state, nf_name, response_message): + def _handle_response(subscription_name, administrative_state, nf_name, response_message, change_type): """ Handles the response from Policy, updating the DB @@ -79,9 +97,14 @@ class PolicyResponseHandler: logger.info(f'Response from MR: Sub: {subscription_name} for ' f'NF: {nf_name} received, updating the DB') try: + if administrative_state == AdministrativeState.UNLOCKED.value and change_type == "DELETED": + administrative_state = AdministrativeState.FILTERING.value sub_nf_status = subscription_nf_states[administrative_state][response_message].value policy_response_handle_functions[administrative_state][response_message]( - subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name) + subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name) except Exception as err: logger.error(f'Error changing nf_sub status in the DB: {err}') raise + + + diff --git a/components/pm-subscription-handler/pmsh_service/mod/sub_schema.json b/components/pm-subscription-handler/pmsh_service/mod/sub_schema.json index 18d48174..3ce81f26 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/sub_schema.json +++ b/components/pm-subscription-handler/pmsh_service/mod/sub_schema.json @@ -12,7 +12,8 @@ { "enum":[ "UNLOCKED", - "LOCKED" + "LOCKED", + "FILTERING" ] } ] diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index fdc1394c..9e721c5f 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -15,6 +15,7 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== +import json from enum import Enum from mod import db, logger @@ -34,6 +35,7 @@ class AdministrativeState(Enum): UNLOCKED = 'UNLOCKED' LOCKING = 'LOCKING' LOCKED = 'LOCKED' + FILTERING = 'FILTERING' subscription_nf_states = { @@ -48,6 +50,10 @@ subscription_nf_states = { AdministrativeState.LOCKING.value: { 'success': SubNfState.DELETED, 'failed': SubNfState.DELETE_FAILED + }, + AdministrativeState.FILTERING.value: { + 'success': SubNfState.DELETED, + 'failed': SubNfState.DELETE_FAILED } } @@ -61,6 +67,27 @@ def _get_nf_objects(nf_sub_relationships): return nfs +def get_nfs_for_creation_and_deletion(existing_nfs, new_nfs, action, mrpub, app_conf): + """ Finds new/old nfs for creation/deletion from subscription """ + for existing_nf in existing_nfs: + _compare_nfs(action, app_conf, existing_nf, mrpub, new_nfs) + + +def _compare_nfs(action, app_conf, existing_nf, mrpub, new_nfs): + """ Compares old nfs list to existing nfs list""" + for new_nf in new_nfs: + if existing_nf.nf_name != new_nf.nf_name: + _apply_action_to_nfs(action, app_conf, existing_nf, mrpub, new_nf) + + +def _apply_action_to_nfs(action, app_conf, existing_nf, mrpub, new_nf): + """ Performs create/delete of nf from subscription as required""" + if action == 'create': + app_conf.subscription.create_subscription_on_nfs([new_nf], mrpub, app_conf) + elif action == 'delete': + app_conf.subscription.delete_subscription_from_nfs([existing_nf], mrpub, app_conf) + + class Subscription: def __init__(self, **kwargs): self.subscriptionName = kwargs.get('subscriptionName') @@ -83,30 +110,58 @@ class Subscription: Returns: Subscription object """ + existing_subscription = (SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()) + if existing_subscription is None: + return self.create_new_sub() + else: + if existing_subscription.nfFilter and \ + self._filter_diff(existing_subscription.nfFilter) and \ + existing_subscription.status == AdministrativeState.UNLOCKED.value: + return self.update_existing_sub(existing_subscription) + + def update_existing_sub(self, existing_subscription): + """Update subscription status + + Args: + existing_subscription: the current subscription + + Returns: + Subscription: updated subscription + """ + self.administrativeState = \ + AdministrativeState.FILTERING.value + self.nfFilter = existing_subscription.nfFilter + self.update_subscription_status() + logger.debug(f'Subscription {self.subscriptionName} already exists,' + f' returning this subscription..') + return existing_subscription + + def create_new_sub(self): try: - existing_subscription = (SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()) - if existing_subscription is None: - new_subscription = SubscriptionModel(subscription_name=self.subscriptionName, - status=AdministrativeState.LOCKED.value) - db.session.add(new_subscription) - db.session.commit() - return new_subscription - else: - logger.debug(f'Subscription {self.subscriptionName} already exists,' - f' returning this subscription..') - return existing_subscription + new_subscription = SubscriptionModel(subscription_name=self.subscriptionName, + nfFilter=self.nfFilter, + status=AdministrativeState.LOCKED.value) + db.session.add(new_subscription) + db.session.commit() + return new_subscription except Exception as e: logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}', exc_info=True) finally: db.session.remove() + def _filter_diff(self, existing_subscription_filter): + existing_subscription_filter, nf_filter = \ + json.dumps(existing_subscription_filter, sort_keys=True), \ + json.dumps(self.nfFilter, sort_keys=True) + return existing_subscription_filter != nf_filter + def update_subscription_status(self): """ Updates the status of subscription in subscription table """ try: SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName)\ + SubscriptionModel.subscription_name == self.subscriptionName) \ .update({SubscriptionModel.status: self.administrativeState}, synchronize_session='evaluate') @@ -117,6 +172,21 @@ class Subscription: finally: db.session.remove() + def update_subscription_filter(self): + """ Updates the filter of subscription in subscription table """ + try: + SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName) \ + .update({SubscriptionModel.nfFilter: self.nfFilter}, + synchronize_session='evaluate') + + db.session.commit() + except Exception as e: + logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}', + exc_info=True) + finally: + db.session.remove() + def prepare_subscription_event(self, nf, app_conf): """Prepare the sub event for publishing @@ -128,13 +198,14 @@ class Subscription: dict: the Subscription event to be published. """ try: - clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} + clean_sub = {k: v for k, v in self.__dict__.items() + if k != 'nfFilter' and k != 'current_filter'} if self.administrativeState == AdministrativeState.LOCKING.value: change_type = 'DELETE' else: change_type = 'CREATE' sub_event = {'nfName': nf.nf_name, - 'ipv4Address': nf.ip_address, + 'ipAddress': nf.ip_address, 'blueprintName': nf.sdnc_model_name, 'blueprintVersion': nf.sdnc_model_version, 'policyName': app_conf.operational_policy_name, diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index 6238a298..ffe8ef50 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2020-2021 Nordix Foundation. +# Copyright (C) 2019-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,10 +18,11 @@ from jsonschema import ValidationError from mod import logger, aai_client +from mod.aai_client import _filter_nf_data, get_pmsh_nfs_from_aai from mod.aai_event_handler import process_aai_events from mod.network_function import NetworkFunctionFilter from mod.pmsh_utils import PeriodicTask -from mod.subscription import AdministrativeState +from mod.subscription import AdministrativeState, Subscription, get_nfs_for_creation_and_deletion class SubscriptionHandler: @@ -45,12 +46,8 @@ class SubscriptionHandler: else: self.app_conf.refresh_config() self.app_conf.validate_sub_schema() - new_administrative_state = self.app_conf.subscription.administrativeState - if local_admin_state == new_administrative_state: - logger.info(f'Administrative State did not change in the app config: ' - f'{new_administrative_state}') - else: - self._check_state_change(local_admin_state, new_administrative_state) + local_admin_state = self.apply_subscription_changes() + self.compare_admin_state(local_admin_state) except (ValidationError, TypeError) as err: logger.error(f'Error occurred during validation of subscription schema {err}', exc_info=True) @@ -58,6 +55,56 @@ class SubscriptionHandler: logger.error(f'Error occurred during the activation/deactivation process {err}', exc_info=True) + def apply_subscription_changes(self): + """ Applies changes to subscription + + Args: + local_admin_state(enum): The local adminstrative state + + Returns: + Enum: Updated administrative state + """ + local_admin_state = self.app_conf.subscription.get_local_sub_admin_state() + if local_admin_state == AdministrativeState.FILTERING.value: + existing_nfs = self.app_conf.subscription.get_network_functions() + self.app_conf.nf_filter = \ + NetworkFunctionFilter(**self.app_conf.subscription.nfFilter) + new_nfs = get_pmsh_nfs_from_aai(self.app_conf) + self.app_conf.subscription.update_subscription_filter() + get_nfs_for_creation_and_deletion(existing_nfs, new_nfs, 'delete', + self.mr_pub, self.app_conf) + get_nfs_for_creation_and_deletion(existing_nfs, new_nfs, 'create', + self.mr_pub, self.app_conf) + + return local_admin_state + + def compare_admin_state(self, local_admin_state): + """ Check for changes in administrative state + + Args: + local_admin_state(enum): + + """ + new_administrative_state = self.app_conf.subscription.administrativeState + if local_admin_state == new_administrative_state: + logger.info(f'Administrative State did not change in the app config: ' + f'{new_administrative_state}') + else: + self._check_state_change(local_admin_state, new_administrative_state) + + def fetch_aai_nf_data(self): + """ Fetchs AAI data + + Returns: + dict: the json response from AAI query after filter is applied + """ + aai_nf_data = aai_client._get_all_aai_nf_data(self.app_conf) + if aai_nf_data: + new_nfs = _filter_nf_data(aai_nf_data, self.app_conf) + else: + raise RuntimeError('Failed to get data from AAI') + return new_nfs + def _check_state_change(self, local_admin_state, new_administrative_state): if new_administrative_state == AdministrativeState.UNLOCKED.value: logger.info(f'Administrative State has changed from {local_admin_state} ' |