summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod
diff options
context:
space:
mode:
authorSagarS <sagar.shetty@est.tech>2021-12-09 16:59:51 +0000
committerSagarS <sagar.shetty@est.tech>2021-12-09 17:01:04 +0000
commitadcbd374f47858f02a5155ec63d678442fb7f4a0 (patch)
treecdb0449b2c08748845bf92254b45ee31619339c9 /components/pm-subscription-handler/pmsh_service/mod
parent15dde94eccad074dda76c420f900c977c184b66f (diff)
[DCAEGEN2] PMSH AAI changes with new subscription format
Issue-ID: DCAEGEN2-2912 Change-Id: Ibd21f8f7a01a31bd7db19a39af010d6805b41622 Signed-off-by: SagarS <sagar.shetty@est.tech>
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod')
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py143
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py2
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py12
3 files changed, 96 insertions, 61 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
index a00c164a..fe4166e7 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
@@ -15,12 +15,14 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-
import json
from enum import Enum
-
-from mod import logger
+from mod import logger, db
from mod.network_function import NetworkFunction
+from mod.pmsh_config import AppConfig, MRTopic
+from mod.api.db_models import SubscriptionModel
+from mod.network_function import NetworkFunctionFilter
+from mod.api.services import subscription_service
class XNFType(Enum):
@@ -33,53 +35,98 @@ class AAIEvent(Enum):
UPDATE = 'UPDATE'
-def process_aai_events(mr_sub, mr_pub, app, app_conf):
+def is_pnf_xnf(entity_type):
"""
- Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active.
-
+ Applies measurement groups to network functions identified by AAI event
Args:
- mr_sub (_MrSub): MR subscriber
- mr_pub (_MrPub): MR publisher
- app (db): DB application
- app_conf (AppConfig): the application configuration.
+ entity_type (string): The type of network function
"""
- app.app_context().push()
- logger.info('Polling MR for XNF AAI events.')
- try:
- aai_events = mr_sub.get_from_topic('dcae_pmsh_aai_event')
- if aai_events is not None and len(aai_events) != 0:
- aai_events = [json.loads(e) for e in aai_events]
- xnf_events = [e for e in aai_events if e['event-header']['entity-type'] == (
- XNFType.PNF.value or XNFType.VNF.value)]
- for entry in xnf_events:
- logger.debug(f'AAI-EVENT entry: {entry}')
- aai_entity = entry['entity']
- action = entry['event-header']['action']
- entity_type = entry['event-header']['entity-type']
- xnf_name = aai_entity['pnf-name'] if entity_type == XNFType.PNF.value \
- else aai_entity['vnf-name']
- if aai_entity['orchestration-status'] != 'Active':
- logger.info(f'Skipping XNF {xnf_name} as its orchestration-status '
- f'is not "Active"')
- continue
- nf = NetworkFunction(nf_name=xnf_name,
- ipv4_address=aai_entity['ipaddress-v4-oam'],
- ipv6_address=aai_entity['ipaddress-v6-oam'],
- model_invariant_id=aai_entity['model-invariant-id'],
- model_version_id=aai_entity['model-version-id'])
- if not nf.set_nf_model_params(app_conf):
- continue
- if app_conf.nf_filter.is_nf_in_filter(nf):
- _process_event(action, nf, mr_pub, app_conf)
- except Exception as e:
- logger.error(f'Failed to process AAI event: {e}', exc_info=True)
+ return entity_type == (XNFType.PNF.value or XNFType.VNF.value)
+
+
+class AAIEventHandler:
+ """ Responsible for handling AAI update events in PMSH """
+
+ def __init__(self, app):
+ self.app = app
+ def execute(self):
+ """
+ Processes AAI UPDATE events for each filtered xNFs where
+ orchestration status is set to Active.
+ """
+ self.app.app_context().push()
+ logger.info('Polling MR for XNF AAI events.')
+ try:
+ aai_events = AppConfig.get_instance().get_from_topic(MRTopic.AAI_SUBSCRIBER.value,
+ 'dcae_pmsh_aai_event')
+ if aai_events is not None and len(aai_events) != 0:
+ pmsh_nf_names = list(nf.nf_name for nf in NetworkFunction.get_all())
+ aai_events = [json.loads(e) for e in aai_events]
+ xnf_events = [e for e in aai_events if is_pnf_xnf(e['event-header']['entity-type'])]
+ new_nfs = []
+ for entry in xnf_events:
+ logger.debug(f'AAI-EVENT entry: {entry}')
+ aai_entity = entry['entity']
+ action = entry['event-header']['action']
+ entity_type = entry['event-header']['entity-type']
+ xnf_name = aai_entity['pnf-name'] if entity_type == XNFType.PNF.value \
+ else aai_entity['vnf-name']
+ if aai_entity['orchestration-status'] != 'Active':
+ logger.info(f'Skipping XNF {xnf_name} as its orchestration-status '
+ f'is not "Active"')
+ continue
+ nf = NetworkFunction(nf_name=xnf_name,
+ ipv4_address=aai_entity['ipaddress-v4-oam'],
+ ipv6_address=aai_entity['ipaddress-v6-oam'],
+ model_invariant_id=aai_entity['model-invariant-id'],
+ model_version_id=aai_entity['model-version-id'])
+ if action == AAIEvent.DELETE.value and xnf_name in pmsh_nf_names:
+ logger.info(f'Delete event found for network function {nf.nf_name}')
+ NetworkFunction.delete(nf_name=nf.nf_name)
+ logger.info(f'{nf.nf_name} successfully deleted.')
+ elif action == AAIEvent.UPDATE.value and \
+ xnf_name not in pmsh_nf_names and \
+ nf.set_nf_model_params(AppConfig.get_instance()):
+ new_nfs.append(nf)
+ if new_nfs:
+ self.apply_nfs_to_subscriptions(new_nfs)
+ except Exception as e:
+ logger.error(f'Failed to process AAI event due to: {e}')
-def _process_event(action, nf, mr_pub, app_conf):
- if action == AAIEvent.UPDATE.value:
- logger.info(f'Update event found for network function {nf.nf_name}')
- app_conf.subscription.create_subscription_on_nfs([nf], mr_pub)
- elif action == AAIEvent.DELETE.value:
- logger.info(f'Delete event found for network function {nf.nf_name}')
- NetworkFunction.delete(nf_name=nf.nf_name)
- logger.info(f'{nf.nf_name} successfully deleted.')
+ @staticmethod
+ def apply_nfs_to_subscriptions(new_nfs):
+ """
+ Applies measurement groups to network functions identified by AAI event
+ Args:
+ new_nfs (list[NetworkFunction]): new network functions identified
+ """
+ subscriptions = db.session.query(SubscriptionModel).all()
+ if subscriptions:
+ for subscription in subscriptions:
+ try:
+ nf_filter = NetworkFunctionFilter(**subscription.network_filter.serialize())
+ filtered_nfs = []
+ for nf in new_nfs:
+ if nf_filter.is_nf_in_filter(nf):
+ filtered_nfs.append(nf)
+ if filtered_nfs:
+ subscription_service.save_filtered_nfs(filtered_nfs)
+ subscription_service. \
+ apply_subscription_to_nfs(filtered_nfs, subscription.subscription_name)
+ unlocked_meas_grp = subscription_service. \
+ apply_measurement_grp_to_nfs(filtered_nfs,
+ subscription.measurement_groups)
+ if unlocked_meas_grp:
+ subscription_service. \
+ publish_measurement_grp_to_nfs(subscription, filtered_nfs,
+ unlocked_meas_grp)
+ else:
+ logger.error(f'All measurement groups are locked for subscription: '
+ f'{subscription.subscription_name}, '
+ f'please verify/check measurement groups.')
+ db.session.commit()
+ except Exception as e:
+ logger.error(f'Failed to process AAI event for subscription: '
+ f'{subscription.subscription_name} due to: {e}')
+ db.session.remove()
diff --git a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py
index 73467821..fc27f992 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/api/services/subscription_service.py
@@ -129,7 +129,7 @@ def apply_measurement_grp_to_nfs(filtered_nfs, measurement_groups):
Saves measurement groups against nfs with status as PENDING_CREATE
Args:
- filtered_nfs (list[NetworkFunction])): list of filtered network functions
+ filtered_nfs (list[NetworkFunction]): list of filtered network functions
measurement_groups (list[MeasurementGroupModel]): list of measurement group
Returns:
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
index 29f9121d..5fbb9a6c 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
@@ -18,9 +18,7 @@
from jsonschema import ValidationError
from mod import logger, aai_client
-from mod.aai_event_handler import process_aai_events
from mod.network_function import NetworkFunctionFilter
-from mod.pmsh_utils import PeriodicTask
from mod.subscription import AdministrativeState
@@ -73,7 +71,6 @@ class SubscriptionHandler:
def _activate(self, new_administrative_state):
if not self.app_conf.nf_filter:
self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter)
- self._start_aai_event_thread()
self.app_conf.subscription.update_sub_params(new_administrative_state,
self.app_conf.subscription.fileBasedGP,
self.app_conf.subscription.fileLocation,
@@ -91,15 +88,6 @@ class SubscriptionHandler:
self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub)
self.app_conf.subscription.update_subscription_status()
- def _start_aai_event_thread(self):
- logger.info('Starting polling for NF info on AAI-EVENT topic on DMaaP MR.')
- self.aai_event_thread = PeriodicTask(20, process_aai_events, args=(self.aai_sub,
- self.mr_pub,
- self.app,
- self.app_conf))
- self.aai_event_thread.name = 'aai_event_thread'
- self.aai_event_thread.start()
-
def stop_aai_event_thread(self):
if self.aai_event_thread is not None:
self.aai_event_thread.cancel()