From adcbd374f47858f02a5155ec63d678442fb7f4a0 Mon Sep 17 00:00:00 2001 From: SagarS Date: Thu, 9 Dec 2021 16:59:51 +0000 Subject: [DCAEGEN2] PMSH AAI changes with new subscription format Issue-ID: DCAEGEN2-2912 Change-Id: Ibd21f8f7a01a31bd7db19a39af010d6805b41622 Signed-off-by: SagarS --- .../pmsh_service/mod/aai_event_handler.py | 143 ++++++++++++++------- .../mod/api/services/subscription_service.py | 2 +- .../pmsh_service/mod/subscription_handler.py | 12 -- 3 files changed, 96 insertions(+), 61 deletions(-) (limited to 'components/pm-subscription-handler/pmsh_service/mod') diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index a00c164a..fe4166e7 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -15,12 +15,14 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== - import json from enum import Enum - -from mod import logger +from mod import logger, db from mod.network_function import NetworkFunction +from mod.pmsh_config import AppConfig, MRTopic +from mod.api.db_models import SubscriptionModel +from mod.network_function import NetworkFunctionFilter +from mod.api.services import subscription_service class XNFType(Enum): @@ -33,53 +35,98 @@ class AAIEvent(Enum): UPDATE = 'UPDATE' -def process_aai_events(mr_sub, mr_pub, app, app_conf): +def is_pnf_xnf(entity_type): """ - Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active. - + Applies measurement groups to network functions identified by AAI event Args: - mr_sub (_MrSub): MR subscriber - mr_pub (_MrPub): MR publisher - app (db): DB application - app_conf (AppConfig): the application configuration. + entity_type (string): The type of network function """ - app.app_context().push() - logger.info('Polling MR for XNF AAI events.') - try: - aai_events = mr_sub.get_from_topic('dcae_pmsh_aai_event') - if aai_events is not None and len(aai_events) != 0: - aai_events = [json.loads(e) for e in aai_events] - xnf_events = [e for e in aai_events if e['event-header']['entity-type'] == ( - XNFType.PNF.value or XNFType.VNF.value)] - for entry in xnf_events: - logger.debug(f'AAI-EVENT entry: {entry}') - aai_entity = entry['entity'] - action = entry['event-header']['action'] - entity_type = entry['event-header']['entity-type'] - xnf_name = aai_entity['pnf-name'] if entity_type == XNFType.PNF.value \ - else aai_entity['vnf-name'] - if aai_entity['orchestration-status'] != 'Active': - logger.info(f'Skipping XNF {xnf_name} as its orchestration-status ' - f'is not "Active"') - continue - nf = NetworkFunction(nf_name=xnf_name, - ipv4_address=aai_entity['ipaddress-v4-oam'], - ipv6_address=aai_entity['ipaddress-v6-oam'], - model_invariant_id=aai_entity['model-invariant-id'], - model_version_id=aai_entity['model-version-id']) - if not nf.set_nf_model_params(app_conf): - continue - if app_conf.nf_filter.is_nf_in_filter(nf): - _process_event(action, nf, mr_pub, app_conf) - except Exception as e: - logger.error(f'Failed to process AAI event: {e}', exc_info=True) + return entity_type == (XNFType.PNF.value or XNFType.VNF.value) + + +class AAIEventHandler: + """ Responsible for handling AAI update events in PMSH """ + + def __init__(self, app): + self.app = app + def execute(self): + """ + Processes AAI UPDATE events for each filtered xNFs where + orchestration status is set to Active. + """ + self.app.app_context().push() + logger.info('Polling MR for XNF AAI events.') + try: + aai_events = AppConfig.get_instance().get_from_topic(MRTopic.AAI_SUBSCRIBER.value, + 'dcae_pmsh_aai_event') + if aai_events is not None and len(aai_events) != 0: + pmsh_nf_names = list(nf.nf_name for nf in NetworkFunction.get_all()) + aai_events = [json.loads(e) for e in aai_events] + xnf_events = [e for e in aai_events if is_pnf_xnf(e['event-header']['entity-type'])] + new_nfs = [] + for entry in xnf_events: + logger.debug(f'AAI-EVENT entry: {entry}') + aai_entity = entry['entity'] + action = entry['event-header']['action'] + entity_type = entry['event-header']['entity-type'] + xnf_name = aai_entity['pnf-name'] if entity_type == XNFType.PNF.value \ + else aai_entity['vnf-name'] + if aai_entity['orchestration-status'] != 'Active': + logger.info(f'Skipping XNF {xnf_name} as its orchestration-status ' + f'is not "Active"') + continue + nf = NetworkFunction(nf_name=xnf_name, + ipv4_address=aai_entity['ipaddress-v4-oam'], + ipv6_address=aai_entity['ipaddress-v6-oam'], + model_invariant_id=aai_entity['model-invariant-id'], + model_version_id=aai_entity['model-version-id']) + if action == AAIEvent.DELETE.value and xnf_name in pmsh_nf_names: + logger.info(f'Delete event found for network function {nf.nf_name}') + NetworkFunction.delete(nf_name=nf.nf_name) + logger.info(f'{nf.nf_name} successfully deleted.') + elif action == AAIEvent.UPDATE.value and \ + xnf_name not in pmsh_nf_names and \ + nf.set_nf_model_params(AppConfig.get_instance()): + new_nfs.append(nf) + if new_nfs: + self.apply_nfs_to_subscriptions(new_nfs) + except Exception as e: + logger.error(f'Failed to process AAI event due to: {e}') -def _process_event(action, nf, mr_pub, app_conf): - if action == AAIEvent.UPDATE.value: - logger.info(f'Update event found for network function {nf.nf_name}') - app_conf.subscription.create_subscription_on_nfs([nf], mr_pub) - elif action == AAIEvent.DELETE.value: - logger.info(f'Delete event found for network function {nf.nf_name}') - NetworkFunction.delete(nf_name=nf.nf_name) - logger.info(f'{nf.nf_name} successfully deleted.') + @staticmethod + def apply_nfs_to_subscriptions(new_nfs): + """ + Applies measurement groups to network functions identified by AAI event + Args: + new_nfs (list[NetworkFunction]): new network functions identified + """ + subscriptions = db.session.query(SubscriptionModel).all() + if subscriptions: + for subscription in subscriptions: + try: + nf_filter = NetworkFunctionFilter(**subscription.network_filter.serialize()) + filtered_nfs = [] + for nf in new_nfs: + if nf_filter.is_nf_in_filter(nf): + filtered_nfs.append(nf) + if filtered_nfs: + subscription_service.save_filtered_nfs(filtered_nfs) + subscription_service. \ + apply_subscription_to_nfs(filtered_nfs, subscription.subscription_name) + unlocked_meas_grp = subscription_service. \ + apply_measurement_grp_to_nfs(filtered_nfs, + subscription.measurement_groups) + if unlocked_meas_grp: + subscription_service. \ + publish_measurement_grp_to_nfs(subscription, filtered_nfs, + unlocked_meas_grp) + else: + logger.error(f'All measurement groups are locked for subscription: ' + f'{subscription.subscription_name}, ' + f'please verify/check measurement groups.') + db.session.commit() + except Exception as e: + logger.error(f'Failed to process AAI event for subscription: ' + f'{subscription.subscription_name} due to: {e}') + db.session.remove() diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py index 73467821..fc27f992 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py +++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py @@ -129,7 +129,7 @@ def apply_measurement_grp_to_nfs(filtered_nfs, measurement_groups): Saves measurement groups against nfs with status as PENDING_CREATE Args: - filtered_nfs (list[NetworkFunction])): list of filtered network functions + filtered_nfs (list[NetworkFunction]): list of filtered network functions measurement_groups (list[MeasurementGroupModel]): list of measurement group Returns: diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index 29f9121d..5fbb9a6c 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -18,9 +18,7 @@ from jsonschema import ValidationError from mod import logger, aai_client -from mod.aai_event_handler import process_aai_events from mod.network_function import NetworkFunctionFilter -from mod.pmsh_utils import PeriodicTask from mod.subscription import AdministrativeState @@ -73,7 +71,6 @@ class SubscriptionHandler: def _activate(self, new_administrative_state): if not self.app_conf.nf_filter: self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter) - self._start_aai_event_thread() self.app_conf.subscription.update_sub_params(new_administrative_state, self.app_conf.subscription.fileBasedGP, self.app_conf.subscription.fileLocation, @@ -91,15 +88,6 @@ class SubscriptionHandler: self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub) self.app_conf.subscription.update_subscription_status() - def _start_aai_event_thread(self): - logger.info('Starting polling for NF info on AAI-EVENT topic on DMaaP MR.') - self.aai_event_thread = PeriodicTask(20, process_aai_events, args=(self.aai_sub, - self.mr_pub, - self.app, - self.app_conf)) - self.aai_event_thread.name = 'aai_event_thread' - self.aai_event_thread.start() - def stop_aai_event_thread(self): if self.aai_event_thread is not None: self.aai_event_thread.cancel() -- cgit 1.2.3-korg