diff options
author | 2020-02-24 14:13:03 +0000 | |
---|---|---|
committer | 2020-02-26 15:25:50 +0000 | |
commit | e7f6914ca5397987eddc6788a6e378c51b12ce52 (patch) | |
tree | 1c4a1015afa1030ee3e75fdf78c282a2ce994433 /components/pm-subscription-handler/pmsh_service | |
parent | de549f5f1bb3e0a6f94e9755ae0800b469114113 (diff) |
Handle AAI Update and Delete events for PMSH
Change-Id: I7f84e4429011bbaea4de23077ce23629b897fd7d
Issue-ID: DCAEGEN2-1846
Signed-off-by: emartin <ephraim.martin@est.tech>
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
6 files changed, 150 insertions, 17 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py new file mode 100755 index 00000000..f8254e52 --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -0,0 +1,90 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2020 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== + +import json +from enum import Enum + +from mod import pmsh_logging as logger +from mod.network_function import NetworkFunction +from mod.subscription import NetworkFunctionFilter + + +class XNFType(Enum): + PNF = 'pnf' + VNF = 'vnf' + + +class AAIEvent(Enum): + DELETE = 'DELETE' + UPDATE = 'UPDATE' + + +class OrchestrationStatus(Enum): + ACTIVE = 'Active' + INVENTORIED = 'Inventoried' + + +def process_aai_events(mr_sub, subscription, mr_pub, app): + """ + Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active. + + Args: + mr_sub (_MrSub): MR subscriber + subscription (Subscription): The current subscription object + mr_pub (_MrPub): MR publisher + app (db): DB application + """ + app.app_context().push() + aai_events = mr_sub.get_from_topic('AAI-EVENT') + + if _aai_event_exists(aai_events): + for entry in aai_events: + logger.debug(f'AAI-EVENT entry: {entry}') + entry = json.loads(entry) + event_header = entry['event-header'] + aai_xnf = entry['entity'] + action = event_header['action'] + entity_type = event_header['entity-type'] + xnf_name = aai_xnf['pnf-name'] if entity_type == XNFType.PNF.value else aai_xnf[ + 'vnf-name'] + new_status = aai_xnf['orchestration-status'] + + if NetworkFunctionFilter(**subscription.nfFilter).is_nf_in_filter(xnf_name): + _process_event(action, new_status, xnf_name, subscription, mr_pub) + + +def _process_event(action, new_status, xnf_name, subscription, mr_pub): + if action == AAIEvent.UPDATE.value: + logger.debug(f'Update event found for network function {xnf_name}') + local_xnf = NetworkFunction.get(xnf_name) + + if local_xnf is None: + logger.debug(f'Activating subscription for network function {xnf_name}') + subscription.process_subscription([NetworkFunction( + nf_name=xnf_name, orchestration_status=new_status)], mr_pub) + else: + logger.debug(f"Update Event for network function {xnf_name} will not be processed " + f" as it's state is set to {local_xnf.orchestration_status}.") + elif action == AAIEvent.DELETE.value: + logger.debug(f'Delete event found for network function {xnf_name}') + NetworkFunction.delete(nf_name=xnf_name) + logger.debug(f'{xnf_name} successfully deleted.') + + +def _aai_event_exists(aai_events): + return aai_events is not None and len(aai_events) != 0 diff --git a/components/pm-subscription-handler/pmsh_service/mod/db_models.py b/components/pm-subscription-handler/pmsh_service/mod/db_models.py index 479d40e5..d1836760 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/db_models.py +++ b/components/pm-subscription-handler/pmsh_service/mod/db_models.py @@ -67,6 +67,9 @@ class NetworkFunctionModel(db.Model): class NfSubRelationalModel(db.Model): __tablename__ = 'nf_to_sub_rel' + __mapper_args__ = { + 'confirm_deleted_rows': False + } id = Column(Integer, primary_key=True, autoincrement=True) subscription_name = Column( String, diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py index 9f21cc66..c4b9b56c 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py +++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py @@ -32,6 +32,13 @@ class NetworkFunction: def __str__(self): return f'nf-name: {self.nf_name}, orchestration-status: {self.orchestration_status}' + def __eq__(self, other): + return self.nf_name == other.nf_name and \ + self.orchestration_status == other.orchestration_status + + def __hash__(self): + return hash((self.nf_name, self.orchestration_status)) + def create(self): """ Creates a NetworkFunction database entry """ existing_nf = NetworkFunctionModel.query.filter( @@ -72,8 +79,7 @@ class NetworkFunction: def delete(**kwargs): """ Deletes a network function from the database """ nf_name = kwargs['nf_name'] - NetworkFunctionModel.query.filter( - NetworkFunctionModel.nf_name == nf_name). \ - delete(synchronize_session='evaluate') + nf = NetworkFunctionModel.query.filter( + NetworkFunctionModel.nf_name == nf_name).one_or_none() - db.session.commit() + db.session.delete(nf) if nf else None diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 4a77543b..9ff0c653 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -18,14 +18,15 @@ import json import threading import uuid +from threading import Timer import requests from requests.auth import HTTPBasicAuth from tenacity import retry, wait_fixed, retry_if_exception_type import mod.pmsh_logging as logger -from mod.subscription import Subscription, SubNfState, AdministrativeState from mod.network_function import NetworkFunction +from mod.subscription import Subscription, SubNfState, AdministrativeState class AppConfig: @@ -163,6 +164,7 @@ class _MrSub(_DmaapMrClient): try: session = requests.Session() headers = {'accept': 'application/json', 'content-type': 'application/json'} + logger.debug(f'Request sent to MR topic: {self.topic_url}') response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}' f'?timeout={timeout}', auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers, @@ -241,3 +243,12 @@ policy_response_handle_functions = { 'failed': Subscription.update_sub_nf_status } } + + +class PeriodicTask(Timer): + """ + See :class:`Timer`. + """ + def run(self): + while not self.finished.wait(self.interval): + self.function(*self.args, **self.kwargs) diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index 031609aa..7a0b88c1 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -143,12 +143,19 @@ class Subscription: db.session.commit() def delete_subscription(self): - """ Deletes a subscription from the database """ - SubscriptionModel.query.filter( - SubscriptionModel.subscription_name == self.subscriptionName). \ - delete(synchronize_session='evaluate') - - db.session.commit() + """ Deletes a subscription and all its association from the database. A network function + that is only associated with the subscription being removed will also be deleted.""" + subscription = SubscriptionModel.query.filter( + SubscriptionModel.subscription_name == self.subscriptionName).one_or_none() + if subscription: + for nf_relationship in subscription.nfs: + other_nf_relationship = NfSubRelationalModel.query.filter( + NfSubRelationalModel.subscription_name != self.subscriptionName, + NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none() + if not other_nf_relationship: + db.session.delete(nf_relationship.nf) + db.session.delete(subscription) + db.session.commit() @retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3), retry=retry_if_exception_type(Exception)) diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index 5c81250f..31d1d079 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -21,12 +21,14 @@ import threading import mod.aai_client as aai import mod.pmsh_logging as logger from mod import db, create_app, launch_api_server +from mod.aai_event_handler import process_aai_events from mod.config_handler import ConfigHandler -from mod.pmsh_utils import AppConfig +from mod.pmsh_utils import AppConfig, PeriodicTask from mod.subscription import Subscription, AdministrativeState -def subscription_processor(config_handler, administrative_state, mr_pub, app): +def subscription_processor(config_handler, administrative_state, mr_pub, app, + mr_aai_event_subscriber): """ Checks for changes of administrative state in config and proceeds to process the Subscription if a change has occurred @@ -36,10 +38,10 @@ def subscription_processor(config_handler, administrative_state, mr_pub, app): administrative_state (str): The administrative state mr_pub (_MrPub): MR publisher app (db): DB application + mr_aai_event_subscriber (_MrSub): AAI events MR subscriber """ app.app_context().push() config = config_handler.get_config() - sub, nfs = aai.get_pmsh_subscription_data(config) new_administrative_state = config['policy']['subscription']['administrativeState'] polling_period = 30.0 @@ -47,17 +49,29 @@ def subscription_processor(config_handler, administrative_state, mr_pub, app): if administrative_state == new_administrative_state: logger.debug('Administrative State did not change in the Config') else: + logger.debug(f'Administrative State changed from "{administrative_state}" "to ' + f'"{new_administrative_state}".') + sub, nfs = aai.get_pmsh_subscription_data(config) sub.process_subscription(nfs, mr_pub) + aai_event_thread = PeriodicTask(10, process_aai_events, args=(mr_aai_event_subscriber, + sub, mr_pub, app)) + + if new_administrative_state == AdministrativeState.UNLOCKED.value: + logger.debug('Listening to AAI-EVENT topic in MR.') + aai_event_thread.start() + else: + logger.debug('Stopping to listen to AAI-EVENT topic in MR.') + aai_event_thread.cancel() except Exception as err: logger.debug(f'Error occurred during the activation/deactivation process {err}') threading.Timer(polling_period, subscription_processor, - [config_handler, new_administrative_state, mr_pub, app]).start() + [config_handler, new_administrative_state, mr_pub, app, + mr_aai_event_subscriber]).start() def main(): - try: config_handler = ConfigHandler() config = config_handler.get_config() @@ -68,6 +82,7 @@ def main(): sub, nfs = aai.get_pmsh_subscription_data(config) mr_pub = app_conf.get_mr_pub('policy_pm_publisher') mr_sub = app_conf.get_mr_sub('policy_pm_subscriber') + mr_aai_event_subscriber = app_conf.get_mr_sub('aai_subscriber') initial_start_delay = 5.0 administrative_state = AdministrativeState.LOCKED.value @@ -76,7 +91,8 @@ def main(): administrative_state = subscription_in_db.status threading.Timer(initial_start_delay, subscription_processor, - [config_handler, administrative_state, mr_pub, app]).start() + [config_handler, administrative_state, mr_pub, + app, mr_aai_event_subscriber]).start() threading.Timer(20.0, mr_sub.poll_policy_topic, [sub.subscriptionName, app]).start() |