From 1685b700294ee0c1eaa1365d67a42861af0fe721 Mon Sep 17 00:00:00 2001 From: "raviteja.karumuri" Date: Sun, 31 Oct 2021 19:09:19 +0000 Subject: [PMSH] Operational policy and control loop updates Issue-ID: DCAEGEN2-2913 Signed-off-by: raviteja.karumuri Change-Id: Ie64383aa55b07ef4387e9b1e8a2414f37901f0da --- .../pmsh_service/mod/aai_event_handler.py | 4 +- .../pmsh_service/mod/api/db_models.py | 16 ++++++-- .../mod/api/services/measurement_group_service.py | 9 +++-- .../pmsh_service/mod/api/services/nf_service.py | 8 ++-- .../mod/api/services/subscription_service.py | 46 +++++++++++++++------- .../pmsh_service/mod/pmsh_config.py | 3 -- .../pmsh_service/mod/pmsh_utils.py | 16 +++++--- .../pmsh_service/mod/subscription.py | 33 +++++++++------- .../pmsh_service/mod/subscription_handler.py | 8 ++-- 9 files changed, 88 insertions(+), 55 deletions(-) (limited to 'components/pm-subscription-handler/pmsh_service/mod') diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index 6fc4ba90..a00c164a 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2020 Nordix Foundation. +# Copyright (C) 2020-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -78,7 +78,7 @@ def process_aai_events(mr_sub, mr_pub, app, app_conf): def _process_event(action, nf, mr_pub, app_conf): if action == AAIEvent.UPDATE.value: logger.info(f'Update event found for network function {nf.nf_name}') - app_conf.subscription.create_subscription_on_nfs([nf], mr_pub, app_conf) + app_conf.subscription.create_subscription_on_nfs([nf], mr_pub) elif action == AAIEvent.DELETE.value: logger.info(f'Delete event found for network function {nf.nf_name}') NetworkFunction.delete(nf_name=nf.nf_name) diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/db_models.py b/components/pm-subscription-handler/pmsh_service/mod/api/db_models.py index 2b340e24..49ca0581 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/api/db_models.py +++ b/components/pm-subscription-handler/pmsh_service/mod/api/db_models.py @@ -26,6 +26,8 @@ class SubscriptionModel(db.Model): __tablename__ = 'subscriptions' id = Column(Integer, primary_key=True, autoincrement=True) subscription_name = Column(String(100), unique=True, nullable=False) + operational_policy_name = Column(String(80), nullable=False) + control_loop_name = Column(String(80)) status = Column(String(20)) nfs = relationship( @@ -43,12 +45,17 @@ class SubscriptionModel(db.Model): cascade='all, delete-orphan', backref='subscription') - def __init__(self, subscription_name, status): + def __init__(self, subscription_name, operational_policy_name, control_loop_name, status): self.subscription_name = subscription_name + self.operational_policy_name = operational_policy_name + self.control_loop_name = control_loop_name self.status = status def __repr__(self): - return f'subscription_name: {self.subscription_name}, status: {self.status}' + return f'subscription_name: {self.subscription_name},' \ + f'operational_policy_name: {self.operational_policy_name},' \ + f'control_loop_name: {self.control_loop_name},' \ + f'status: {self.status}' def __eq__(self, other): if isinstance(self, other.__class__): @@ -59,7 +66,10 @@ class SubscriptionModel(db.Model): sub_nfs = NfSubRelationalModel.query.filter( NfSubRelationalModel.subscription_name == self.subscription_name).all() db.session.remove() - return {'subscription_name': self.subscription_name, 'subscription_status': self.status, + return {'subscription_name': self.subscription_name, + 'operational_policy_name': self.operational_policy_name, + 'control_loop_name': self.control_loop_name, + 'subscription_status': self.status, 'network_functions': [sub_nf.serialize_nf() for sub_nf in sub_nfs]} diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py index 329dc857..733d803e 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py +++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/measurement_group_service.py @@ -63,19 +63,19 @@ def apply_nf_to_measgroup(nf_name, measurement_group_name): db.session.add(new_nf_measure_grp_rel) -def publish_measurement_group(subscription_name, measurement_group, nf): +def publish_measurement_group(sub_model, measurement_group, nf): """ Publishes an event for measurement group against nfs to MR Args: - subscription_name (string): Subscription name to publish against nf + sub_model(SubscriptionModel): Subscription model object measurement_group (MeasurementGroupModel): Measurement group to publish nf (NetworkFunction): Network function to publish. """ - event_body = nf_service.create_nf_event_body(nf, 'CREATE') + event_body = nf_service.create_nf_event_body(nf, 'CREATE', sub_model) event_body['subscription'] = { "administrativeState": measurement_group.administrative_state, - "subscriptionName": subscription_name, + "subscriptionName": sub_model.subscription_name, "fileBasedGP": measurement_group.file_based_gp, "fileLocation": measurement_group.file_location, "measurementGroup": { @@ -84,4 +84,5 @@ def publish_measurement_group(subscription_name, measurement_group, nf): "managedObjectDNsBasic": measurement_group.managed_object_dns_basic } } + logger.debug(f'Event Body: {event_body}') AppConfig.get_instance().publish_to_topic(MRTopic.POLICY_PM_PUBLISHER.value, event_body) diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/nf_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/nf_service.py index 1fca766a..6d431473 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/api/services/nf_service.py +++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/nf_service.py @@ -37,25 +37,25 @@ def capture_filtered_nfs(sub_name): return aai_client.get_pmsh_nfs_from_aai(AppConfig.get_instance(), nf_filter) -def create_nf_event_body(nf, change_type): +def create_nf_event_body(nf, change_type, sub_model): """ Creates a network function event body to publish on MR Args: nf (NetworkFunction): the Network function to include in the event. change_type (string): define the change type to be applied on node + sub_model(SubscriptionModel): Subscription model object Returns: dict: network function event body to publish on MR. """ - app_conf = AppConfig.get_instance() return {'nfName': nf.nf_name, 'ipAddress': nf.ipv4_address if nf.ipv6_address in (None, '') else nf.ipv6_address, 'blueprintName': nf.sdnc_model_name, 'blueprintVersion': nf.sdnc_model_version, - 'policyName': app_conf.operational_policy_name, + 'operationalPolicyName': sub_model.operational_policy_name, 'changeType': change_type, - 'closedLoopControlName': app_conf.control_loop_name} + 'controlLoopName': sub_model.control_loop_name} def save_nf(nf): diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py index 1485bebe..ea1640c2 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py +++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py @@ -39,24 +39,28 @@ def create_subscription(subscription): logger.info(f'Initiating create subscription for: {subscription["subscriptionName"]}') perform_validation(subscription) try: - sub_name, measurement_groups = save_subscription_request(subscription) + sub_model, measurement_groups = save_subscription_request(subscription) db.session.commit() logger.info(f'Successfully saved subscription request for: ' f'{subscription["subscriptionName"]}') - filtered_nfs = nf_service.capture_filtered_nfs(sub_name) + filtered_nfs = nf_service.capture_filtered_nfs(sub_model.subscription_name) if filtered_nfs: - logger.info(f'Applying the filtered nfs for subscription: {sub_name}') + logger.info(f'Applying the filtered nfs for subscription: ' + f'{sub_model.subscription_name}') save_filtered_nfs(filtered_nfs) - apply_subscription_to_nfs(filtered_nfs, sub_name) + apply_subscription_to_nfs(filtered_nfs, sub_model.subscription_name) unlocked_msmt_groups = apply_measurement_grp_to_nfs(filtered_nfs, measurement_groups) db.session.commit() if unlocked_msmt_groups: - publish_measurement_grp_to_nfs(sub_name, filtered_nfs, unlocked_msmt_groups) + publish_measurement_grp_to_nfs(sub_model, filtered_nfs, + unlocked_msmt_groups) else: - logger.error(f'All measurement groups are locked for subscription: {sub_name}, ' + logger.error(f'All measurement groups are locked for subscription: ' + f'{sub_model.subscription_name}, ' f'please verify/check measurement groups.') else: - logger.error(f'No network functions found for subscription: {sub_name}, ' + logger.error(f'No network functions found for subscription: ' + f'{sub_model.subscription_name}, ' f'please verify/check NetworkFunctionFilter.') except IntegrityError as e: db.session.rollback() @@ -68,12 +72,13 @@ def create_subscription(subscription): db.session.remove() -def publish_measurement_grp_to_nfs(subscription_name, filtered_nfs, measurement_groups): +def publish_measurement_grp_to_nfs(sub_model, filtered_nfs, + measurement_groups): """ Publishes an event for measurement groups against nfs Args: - subscription_name (string): subscription name against nfs + sub_model(SubscriptionModel): Subscription model object filtered_nfs (list[NetworkFunction])): list of filtered network functions measurement_groups (list[MeasurementGroupModel]): list of unlocked measurement group """ @@ -83,11 +88,11 @@ def publish_measurement_grp_to_nfs(subscription_name, filtered_nfs, measurement_ logger.info(f'Publishing event for nf name, measure_grp_name: {nf.nf_name},' f'{measurement_group.measurement_group_name}') measurement_group_service.publish_measurement_group( - subscription_name, measurement_group, nf) + sub_model, measurement_group, nf) except Exception as ex: logger.error(f'Publish event failed for nf name, measure_grp_name, sub_name: ' f'{nf.nf_name},{measurement_group.measurement_group_name}, ' - f'{subscription_name} with error: {ex}') + f'{sub_model.subscription_name} with error: {ex}') def save_filtered_nfs(filtered_nfs): @@ -157,6 +162,8 @@ def check_missing_data(subscription): """ if subscription['subscriptionName'].strip() in (None, ''): raise InvalidDataException("No value provided in subscription name") + if subscription['operationalPolicyName'].strip() in (None, ''): + raise InvalidDataException("Value required for operational Policy Name") for measurement_group in subscription.get('measurementGroups'): measurement_group_details = measurement_group['measurementGroup'] @@ -200,7 +207,7 @@ def save_subscription_request(subscription): list[MeasurementGroupModel]: list of measurement groups """ logger.info(f'Saving subscription request for: {subscription["subscriptionName"]}') - sub_name = save_subscription(subscription).subscription_name + sub_model = save_subscription(subscription) save_nf_filter(subscription["nfFilter"], subscription["subscriptionName"]) measurement_groups = [] for measurement_group in subscription['measurementGroups']: @@ -208,7 +215,7 @@ def save_subscription_request(subscription): measurement_group_service.save_measurement_group( measurement_group['measurementGroup'], subscription["subscriptionName"])) - return sub_name, measurement_groups + return sub_model, measurement_groups def check_duplicate_fields(subscription_name): @@ -235,9 +242,18 @@ def save_subscription(subscription): Args: subscription (dict): subscription model to be saved. + Returns: + subscription_model(SubscriptionModel): subscription model + which is added to the session """ - subscription_model = SubscriptionModel(subscription_name=subscription["subscriptionName"], - status=AdministrativeState.LOCKED.value) + control_loop_name = "" + if 'controlLoopName' in subscription: + control_loop_name = subscription['controlLoopName'] + subscription_model = \ + SubscriptionModel(subscription_name=subscription["subscriptionName"], + operational_policy_name=subscription["operationalPolicyName"], + control_loop_name=control_loop_name, + status=AdministrativeState.LOCKED.value) db.session.add(subscription_model) return subscription_model diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py index a6fe38ad..9c282ab7 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py @@ -69,9 +69,6 @@ class AppConfig(metaclass=MetaSingleton): self.streams_subscribes = app_config['config'].get('streams_subscribes') # TODO: aaf_creds variable should be removed on code cleanup self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass} - # TODO: changes under discussion once resolve is confirmed will be removed - self.operational_policy_name = 'pmsh-operational-policy' - self.control_loop_name = 'pmsh-control-loop' @staticmethod def get_instance(): diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 26ada11b..d1790bbb 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -76,7 +76,9 @@ class AppConfig: self.operational_policy_name = conf['config'].get('operational_policy_name') self.control_loop_name = conf['config'].get('control_loop_name') self.sub_schema = _load_sub_schema_from_file() - self.subscription = Subscription(**conf['config']['pmsh_policy']['subscription']) + self.subscription = Subscription(self.control_loop_name, + self.operational_policy_name, + **conf['config']['pmsh_policy']['subscription']) self.nf_filter = None def __new__(cls, *args, **kwargs): @@ -129,7 +131,11 @@ class AppConfig: """ try: app_conf = self._get_pmsh_config() - self.subscription = Subscription(**app_conf['config']['pmsh_policy']['subscription']) + self.operational_policy_name = app_conf['config'].get('operational_policy_name') + self.control_loop_name = app_conf['config'].get('control_loop_name') + self.subscription = Subscription(self.control_loop_name, + self.operational_policy_name, + **app_conf['config']['pmsh_policy']['subscription']) logger.info("AppConfig data has been refreshed") except Exception: logger.error('Failed to refresh PMSH AppConfig') @@ -236,17 +242,17 @@ class _MrPub(_DmaapMrClient): except Exception as e: raise e - def publish_subscription_event_data(self, subscription, nf, app_conf): + def publish_subscription_event_data(self, subscription, nf): """ Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic. Args: subscription (Subscription): the `Subscription` object. nf (NetworkFunction): the NetworkFunction to include in the event. - app_conf (AppConfig): the application configuration. """ try: - subscription_event = subscription.prepare_subscription_event(nf, app_conf) + subscription_event = subscription.prepare_subscription_event(nf) + logger.debug(f'Subscription event: {subscription_event}') self.publish_to_topic(subscription_event) except Exception as e: logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True) diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index 7e878cd1..bdfed189 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -62,13 +62,15 @@ def _get_nf_objects(nf_sub_relationships): class Subscription: - def __init__(self, **kwargs): + def __init__(self, control_loop_name, operational_policy_name, **kwargs): self.subscriptionName = kwargs.get('subscriptionName') self.administrativeState = kwargs.get('administrativeState') self.fileBasedGP = kwargs.get('fileBasedGP') self.fileLocation = kwargs.get('fileLocation') self.nfFilter = kwargs.get('nfFilter') self.measurementGroups = kwargs.get('measurementGroups') + self.control_loop_name = control_loop_name + self.operational_policy_name = operational_policy_name self.create() def update_sub_params(self, admin_state, file_based_gp, file_location, meas_groups): @@ -87,8 +89,12 @@ class Subscription: existing_subscription = (SubscriptionModel.query.filter( SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()) if existing_subscription is None: - new_subscription = SubscriptionModel(subscription_name=self.subscriptionName, - status=AdministrativeState.LOCKED.value) + new_subscription = \ + SubscriptionModel(subscription_name=self.subscriptionName, + operational_policy_name=self.operational_policy_name, + control_loop_name=self.control_loop_name, + status=AdministrativeState.LOCKED.value) + db.session.add(new_subscription) db.session.commit() return new_subscription @@ -117,18 +123,19 @@ class Subscription: finally: db.session.remove() - def prepare_subscription_event(self, nf, app_conf): + def prepare_subscription_event(self, nf): """Prepare the sub event for publishing Args: nf (NetworkFunction): the AAI nf. - app_conf (AppConfig): the application configuration. Returns: dict: the Subscription event to be published. """ try: - clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} + clean_sub = \ + {k: v for k, v in self.__dict__.items() if + (k != 'nfFilter' and k != 'control_loop_name' and k != 'operational_policy_name')} if self.administrativeState == AdministrativeState.LOCKING.value: change_type = 'DELETE' else: @@ -139,9 +146,9 @@ class Subscription: 'ipAddress': nf.ipv4_address if nf.ipv6_address in (None, '') else nf.ipv6_address, 'blueprintName': nf.sdnc_model_name, 'blueprintVersion': nf.sdnc_model_version, - 'policyName': app_conf.operational_policy_name, + 'operationalPolicyName': self.operational_policy_name, 'changeType': change_type, - 'closedLoopControlName': app_conf.control_loop_name, + 'controlLoopName': self.control_loop_name, 'subscription': clean_sub} return sub_event except Exception as e: @@ -207,13 +214,12 @@ class Subscription: db.session.remove() return sub_models - def create_subscription_on_nfs(self, nfs, mr_pub, app_conf): + def create_subscription_on_nfs(self, nfs, mr_pub): """ Publishes an event to create a Subscription on an nf Args: nfs(list[NetworkFunction]): A list of NetworkFunction Objects. mr_pub (_MrPub): MR publisher - app_conf (AppConfig): the application configuration. """ try: existing_nfs = self.get_network_functions() @@ -221,26 +227,25 @@ class Subscription: for nf in [new_nf for new_nf in nfs if new_nf not in existing_nfs]: logger.info(f'Publishing event to create ' f'Sub: {self.subscriptionName} on nf: {nf.nf_name}') - mr_pub.publish_subscription_event_data(self, nf, app_conf) + mr_pub.publish_subscription_event_data(self, nf) self.add_network_function_to_subscription(nf, sub_model) self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value, nf.nf_name) except Exception as err: raise Exception(f'Error publishing create event to MR: {err}') - def delete_subscription_from_nfs(self, nfs, mr_pub, app_conf): + def delete_subscription_from_nfs(self, nfs, mr_pub): """ Publishes an event to delete a Subscription from an nf Args: nfs(list[NetworkFunction]): A list of NetworkFunction Objects. mr_pub (_MrPub): MR publisher - app_conf (AppConfig): the application configuration. """ try: for nf in nfs: logger.debug(f'Publishing Event to delete ' f'Sub: {self.subscriptionName} from the nf: {nf.nf_name}') - mr_pub.publish_subscription_event_data(self, nf, app_conf) + mr_pub.publish_subscription_event_data(self, nf) self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_DELETE.value, nf.nf_name) diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index 22654b82..29f9121d 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -79,8 +79,7 @@ class SubscriptionHandler: self.app_conf.subscription.fileLocation, self.app_conf.subscription.measurementGroups) nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf, self.app_conf.nf_filter) - self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub, - self.app_conf) + self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub) self.app_conf.subscription.update_subscription_status() def _deactivate(self): @@ -89,7 +88,7 @@ class SubscriptionHandler: self.stop_aai_event_thread() self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value logger.info('Subscription is now LOCKING/DEACTIVATING.') - self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub, self.app_conf) + self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub) self.app_conf.subscription.update_subscription_status() def _start_aai_event_thread(self): @@ -117,8 +116,7 @@ class SubscriptionHandler: logger.info(f'Retry deletion of subscription ' f'{self.app_conf.subscription.subscriptionName} ' f'from NF: {nf.nf_name}') - self.app_conf.subscription.delete_subscription_from_nfs([nf], self.mr_pub, - self.app_conf) + self.app_conf.subscription.delete_subscription_from_nfs([nf], self.mr_pub) nf.increment_retry_count() else: logger.error(f'Failed to delete the subscription ' -- cgit 1.2.3-korg