# ============LICENSE_START=================================================== # Copyright (C) 2019-2022 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== from enum import Enum from mod import db, logger from mod.api.db_models import SubscriptionModel, NfSubRelationalModel, NetworkFunctionModel class SubNfState(Enum): PENDING_CREATE = 'PENDING_CREATE' CREATE_FAILED = 'CREATE_FAILED' CREATED = 'CREATED' PENDING_DELETE = 'PENDING_DELETE' DELETE_FAILED = 'DELETE_FAILED' DELETED = 'DELETED' class AdministrativeState(Enum): UNLOCKED = 'UNLOCKED' LOCKING = 'LOCKING' LOCKED = 'LOCKED' FILTERING = 'FILTERING' subscription_nf_states = { AdministrativeState.LOCKED.value: { 'success': SubNfState.DELETED, 'failed': SubNfState.DELETE_FAILED }, AdministrativeState.UNLOCKED.value: { 'success': SubNfState.CREATED, 'failed': SubNfState.CREATE_FAILED }, AdministrativeState.LOCKING.value: { 'success': SubNfState.DELETED, 'failed': SubNfState.DELETE_FAILED } } def _get_nf_objects(nf_sub_relationships): nfs = [] for nf_sub_entry in nf_sub_relationships: nf_model_object = NetworkFunctionModel.query.filter( NetworkFunctionModel.nf_name == nf_sub_entry.nf_name).one_or_none() nfs.append(nf_model_object.to_nf()) return nfs class Subscription: def __init__(self, control_loop_name, operational_policy_name, **kwargs): self.subscriptionName = kwargs.get('subscriptionName') self.administrativeState = kwargs.get('administrativeState') self.fileBasedGP = kwargs.get('fileBasedGP') self.fileLocation = kwargs.get('fileLocation') self.nfFilter = kwargs.get('nfFilter') self.measurementGroups = kwargs.get('measurementGroups') self.control_loop_name = control_loop_name self.operational_policy_name = operational_policy_name self.create() def update_sub_params(self, admin_state, file_based_gp, file_location, meas_groups): self.administrativeState = admin_state self.fileBasedGP = file_based_gp self.fileLocation = file_location self.measurementGroups = meas_groups def create(self): """ Creates a subscription database entry Returns: Subscription object """ try: existing_subscription = (SubscriptionModel.query.filter( SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()) if existing_subscription is None: new_subscription = \ SubscriptionModel(subscription_name=self.subscriptionName, operational_policy_name=self.operational_policy_name, control_loop_name=self.control_loop_name, status=AdministrativeState.LOCKED.value) db.session.add(new_subscription) db.session.commit() return new_subscription else: logger.debug(f'Subscription {self.subscriptionName} already exists,' f' returning this subscription..') return existing_subscription except Exception as e: logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}', exc_info=True) finally: db.session.remove() def update_subscription_status(self): """ Updates the status of subscription in subscription table """ try: SubscriptionModel.query.filter( SubscriptionModel.subscription_name == self.subscriptionName) \ .update({SubscriptionModel.status: self.administrativeState}, synchronize_session='evaluate') db.session.commit() except Exception as e: logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}', exc_info=True) finally: db.session.remove() def prepare_subscription_event(self, nf): """Prepare the sub event for publishing Args: nf (NetworkFunction): the AAI nf. Returns: dict: the Subscription event to be published. """ try: clean_sub = \ {k: v for k, v in self.__dict__.items() if (k != 'nfFilter' and k != 'control_loop_name' and k != 'operational_policy_name')} if self.administrativeState == AdministrativeState.LOCKING.value: change_type = 'DELETE' else: change_type = 'CREATE' sub_event = { 'nfName': nf.nf_name, 'ipAddress': nf.ipv4_address if nf.ipv6_address in (None, '') else nf.ipv6_address, 'blueprintName': nf.sdnc_model_name, 'blueprintVersion': nf.sdnc_model_version, 'operationalPolicyName': self.operational_policy_name, 'changeType': change_type, 'controlLoopName': self.control_loop_name, 'subscription': clean_sub} return sub_event except Exception as e: logger.error(f'Failed to prep Sub event for xNF {nf.nf_name}: {e}', exc_info=True) raise def add_network_function_to_subscription(self, nf, sub_model): """ Associates a network function to a Subscription Args: sub_model(SubscriptionModel): The SubscriptionModel from the DB. nf(NetworkFunction): A NetworkFunction object. """ try: current_nf = nf.create() existing_entry = NfSubRelationalModel.query.filter( NfSubRelationalModel.subscription_name == self.subscriptionName, NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none() if existing_entry is None: new_nf_sub = NfSubRelationalModel(self.subscriptionName, nf.nf_name, SubNfState.PENDING_CREATE.value) sub_model.nfs.append(new_nf_sub) db.session.add(sub_model) db.session.commit() logger.info(f'Network function {current_nf.nf_name} added to Subscription ' f'{self.subscriptionName}') except Exception as e: logger.error(f'Failed to add nf {nf.nf_name} to subscription ' f'{self.subscriptionName}: {e}', exc_info=True) logger.debug(f'Subscription {self.subscriptionName} now contains these XNFs:' f'{[nf.nf_name for nf.nf_name in self.get_network_functions()]}') def get(self): """ Retrieves a SubscriptionModel object Returns: SubscriptionModel object else None """ sub_model = SubscriptionModel.query.filter( SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() return sub_model def get_local_sub_admin_state(self): """ Retrieves the subscription admin state Returns: str: The admin state of the SubscriptionModel """ sub_model = SubscriptionModel.query.filter( SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() db.session.remove() return sub_model.status def create_subscription_on_nfs(self, nfs, mr_pub): """ Publishes an event to create a Subscription on an nf Args: nfs(list[NetworkFunction]): A list of NetworkFunction Objects. mr_pub (_MrPub): MR publisher """ try: existing_nfs = self.get_network_functions() sub_model = self.get() for nf in [new_nf for new_nf in nfs if new_nf not in existing_nfs]: logger.info(f'Publishing event to create ' f'Sub: {self.subscriptionName} on nf: {nf.nf_name}') mr_pub.publish_subscription_event_data(self, nf) self.add_network_function_to_subscription(nf, sub_model) self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value, nf.nf_name) except Exception as err: raise Exception(f'Error publishing create event to MR: {err}') def delete_subscription_from_nfs(self, nfs, mr_pub): """ Publishes an event to delete a Subscription from an nf Args: nfs(list[NetworkFunction]): A list of NetworkFunction Objects. mr_pub (_MrPub): MR publisher """ try: for nf in nfs: logger.debug(f'Publishing Event to delete ' f'Sub: {self.subscriptionName} from the nf: {nf.nf_name}') mr_pub.publish_subscription_event_data(self, nf) self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_DELETE.value, nf.nf_name) except Exception as err: raise Exception(f'Error publishing delete event to MR: {err}') @staticmethod def get_all_nfs_subscription_relations(): """ Retrieves all network function to subscription relations Returns: list(NfSubRelationalModel): NetworkFunctions per Subscription list else empty """ nf_per_subscriptions = NfSubRelationalModel.query.all() db.session.remove() return nf_per_subscriptions @staticmethod def update_sub_nf_status(subscription_name, status, nf_name): """ Updates the status of the subscription for a particular nf Args: subscription_name (str): The subscription name nf_name (str): The network function name status (str): Status of the subscription """ try: NfSubRelationalModel.query.filter( NfSubRelationalModel.subscription_name == subscription_name, NfSubRelationalModel.nf_name == nf_name). \ update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate') db.session.commit() except Exception as e: logger.error(f'Failed to update status of nf: {nf_name} for subscription: ' f'{subscription_name}: {e}', exc_info=True) def get_network_functions(self): nf_sub_relationships = NfSubRelationalModel.query.filter( NfSubRelationalModel.subscription_name == self.subscriptionName) nfs = _get_nf_objects(nf_sub_relationships) db.session.remove() return nfs def get_delete_failed_nfs(self): nf_sub_relationships = NfSubRelationalModel.query.filter( NfSubRelationalModel.subscription_name == self.subscriptionName, NfSubRelationalModel.nf_sub_status == SubNfState.DELETE_FAILED.value) nfs = _get_nf_objects(nf_sub_relationships) db.session.remove() return nfs def get_delete_pending_nfs(self): nf_sub_relationships = NfSubRelationalModel.query.filter( NfSubRelationalModel.subscription_name == self.subscriptionName, NfSubRelationalModel.nf_sub_status == SubNfState.PENDING_DELETE.value) nfs = _get_nf_objects(nf_sub_relationships) db.session.remove() return nfs