diff options
author | ERIMROB <robertas.rimkus@est.tech> | 2020-06-19 13:09:44 +0100 |
---|---|---|
committer | ERIMROB <robertas.rimkus@est.tech> | 2020-06-30 02:15:21 +0100 |
commit | b80449b81a2deb3b6e2510a655de02f9375ef636 (patch) | |
tree | ee022a88ee287645c72f6716eb9c501b9cf06484 /components/pm-subscription-handler/pmsh_service/mod | |
parent | 20637908b156aeff53d7607f88d655d0becc1f11 (diff) |
[PMSH] Improve Failure Handling
Signed-off-by: ERIMROB <robertas.rimkus@est.tech>
Change-Id: I15d338321957a293e9f444a10cf3bb06f4212f3e
Issue-ID: DCAEGEN2-2157
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod')
5 files changed, 32 insertions, 31 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index 96f51431..b3957bc5 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -19,6 +19,8 @@ import json from enum import Enum +from requests import HTTPError + from mod import logger from mod.network_function import NetworkFunction @@ -44,22 +46,26 @@ def process_aai_events(mr_sub, mr_pub, app, app_conf): app_conf (AppConfig): the application configuration. """ app.app_context().push() - aai_events = mr_sub.get_from_topic('AAI-EVENT') - - if aai_events is not None and len(aai_events) != 0: - for entry in aai_events: - logger.debug(f'AAI-EVENT entry: {entry}') - entry = json.loads(entry) - event_header = entry['event-header'] - aai_xnf = entry['entity'] - action = event_header['action'] - entity_type = event_header['entity-type'] - xnf_name = aai_xnf['pnf-name'] if entity_type == XNFType.PNF.value else aai_xnf[ - 'vnf-name'] - new_status = aai_xnf['orchestration-status'] + try: + aai_events = mr_sub.get_from_topic('AAI-EVENT') + if len(aai_events) != 0: + for entry in aai_events: + logger.debug(f'AAI-EVENT entry: {entry}') + entry = json.loads(entry) + event_header = entry['event-header'] + aai_xnf = entry['entity'] + action = event_header['action'] + entity_type = event_header['entity-type'] + xnf_name = aai_xnf['pnf-name'] if entity_type == XNFType.PNF.value else aai_xnf[ + 'vnf-name'] + new_status = aai_xnf['orchestration-status'] - if app_conf.nf_filter.is_nf_in_filter(xnf_name, new_status): - _process_event(action, new_status, xnf_name, mr_pub, app_conf) + if app_conf.nf_filter.is_nf_in_filter(xnf_name, new_status): + _process_event(action, new_status, xnf_name, mr_pub, app_conf) + except HTTPError as e: + logger.debug(f'Failed to fetch AAI-EVENT messages from MR {e}') + except Exception as e: + logger.debug(f'Exception trying to process AAI events {e}') def _process_event(action, new_status, xnf_name, mr_pub, app_conf): diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index fb6a5194..354d6b8d 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -200,12 +200,13 @@ class _MrPub(_DmaapMrClient): 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id'] } + logger.info(f'Attempting to publish event to {self.topic_url}') response = session.post(self.topic_url, headers=headers, auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, verify=False) response.raise_for_status() except Exception as e: - logger.debug(e) + logger.error(f'Failed to publish message to MR topic: {e}') raise def publish_subscription_event_data(self, subscription, xnf_name, app_conf): @@ -222,6 +223,7 @@ class _MrPub(_DmaapMrClient): self.publish_to_topic(subscription_event) except Exception as e: logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}') + raise class _MrSub(_DmaapMrClient): @@ -241,9 +243,8 @@ class _MrSub(_DmaapMrClient): timeout: The request timeout value in mSec. Returns: - list[str]: the json response from DMaaP Message Router topic, else None. + list[str]: the json response from DMaaP Message Router topic. """ - topic_data = None try: session = requests.Session() headers = {'accept': 'application/json', 'content-type': 'application/json', @@ -256,10 +257,10 @@ class _MrSub(_DmaapMrClient): verify=False) response.raise_for_status() if response.ok: - topic_data = response.json() + return response.json() except Exception as e: logger.error(f'Failed to fetch message from MR: {e}') - return topic_data + raise class PeriodicTask(Timer): diff --git a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py index 2b917cec..8a993828 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py @@ -18,8 +18,6 @@ import json -from tenacity import retry, wait_fixed, retry_if_exception_type - from mod import logger from mod.network_function import NetworkFunction from mod.subscription import Subscription, AdministrativeState, subscription_nf_states @@ -42,7 +40,6 @@ class PolicyResponseHandler: self.app_conf = app_conf self.app = app - @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception)) def poll_policy_topic(self): """ This method polls MR for response from policy. It checks whether the message is for the @@ -62,7 +59,7 @@ class PolicyResponseHandler: self._handle_response(self.app_conf.subscription.subscriptionName, administrative_state, nf_name, response_message) except Exception as err: - raise Exception(f'Error trying to poll policy response topic on MR: {err}') + logger.error(f'Error trying to poll policy response topic on MR: {err}') @staticmethod def _handle_response(subscription_name, administrative_state, nf_name, response_message): @@ -82,4 +79,5 @@ class PolicyResponseHandler: policy_response_handle_functions[administrative_state][response_message]( subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name) except Exception as err: - raise Exception(f'Error changing nf_sub status in the DB: {err}') + logger.error(f'Error changing nf_sub status in the DB: {err}') + raise diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index d6b17cd9..b623cbdf 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -17,8 +17,6 @@ # ============LICENSE_END===================================================== from enum import Enum -from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt - from mod import db, logger from mod.api.db_models import SubscriptionModel, NfSubRelationalModel, NetworkFunctionModel from mod.network_function import NetworkFunction @@ -198,8 +196,6 @@ class Subscription: logger.debug(f'Failed to delete subscription: {self.subscriptionName} ' f'and it\'s relations from the DB: {e}') - @retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3), - retry=retry_if_exception_type(Exception)) def process_subscription(self, nfs, mr_pub, app_conf): action = 'Deactivate' sub_nf_state = SubNfState.PENDING_DELETE.value diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index add8be42..112f994b 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -42,10 +42,10 @@ class SubscriptionHandler: if self.administrative_state == new_administrative_state: logger.info('Administrative State did not change in the Config') else: - logger.info(f'Administrative State has changed from {self.administrative_state} ' - f'to {new_administrative_state}.') self.current_nfs = aai.get_pmsh_nfs_from_aai(self.app_conf) self.current_sub = self.app_conf.subscription + logger.info(f'Administrative State has changed from {self.administrative_state} ' + f'to {new_administrative_state}.') self.administrative_state = new_administrative_state self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf) |