diff options
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py')
-rwxr-xr-x | components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index b665691d..4a77543b 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -15,12 +15,17 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== +import json +import threading import uuid import requests from requests.auth import HTTPBasicAuth +from tenacity import retry, wait_fixed, retry_if_exception_type import mod.pmsh_logging as logger +from mod.subscription import Subscription, SubNfState, AdministrativeState +from mod.network_function import NetworkFunction class AppConfig: @@ -168,3 +173,71 @@ class _MrSub(_DmaapMrClient): except Exception as e: logger.debug(e) return topic_data + + @staticmethod + def _handle_response(subscription_name, administrative_state, nf_name, response_message): + """ + Handles the response from Policy, updating the DB + + Args: + subscription_name (str): The subscription name + administrative_state (str): The administrative state of the subscription + nf_name (str): The network function name + response_message (str): The message in the response regarding the state (success|failed) + """ + logger.debug(f'Response from MR: Sub: {subscription_name} for ' + f'NF: {nf_name} received, updating the DB') + try: + sub_nf_status = subscription_nf_states[administrative_state][response_message].value + policy_response_handle_functions[administrative_state][response_message]( + subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name) + except Exception as err: + raise Exception(f'Error changing nf_sub status in the DB: {err}') + + @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception)) + def poll_policy_topic(self, subscription_name, app): + """ + This method polls MR for response from policy. It checks whether the message is for the + relevant subscription and then handles the response + + Args: + subscription_name (str): The subscription name + app (app): Needed to push context for the db + """ + app.app_context().push() + administrative_state = Subscription.get(subscription_name).status + try: + response_data = self.get_from_topic('policy_response_consumer') + for data in response_data: + data = json.loads(data) + if data['status']['subscriptionName'] == subscription_name: + nf_name = data['status']['nfName'] + response_message = data['status']['message'] + self._handle_response(subscription_name, administrative_state, + nf_name, response_message) + threading.Timer(5, self.poll_policy_topic, [subscription_name, app]).start() + except Exception as err: + raise Exception(f'Error trying to poll MR: {err}') + + +subscription_nf_states = { + AdministrativeState.LOCKED.value: { + 'success': SubNfState.CREATED, + 'failed': SubNfState.DELETE_FAILED + }, + AdministrativeState.UNLOCKED.value: { + 'success': SubNfState.CREATED, + 'failed': SubNfState.CREATE_FAILED + } +} + +policy_response_handle_functions = { + AdministrativeState.LOCKED.value: { + 'success': NetworkFunction.delete, + 'failed': Subscription.update_sub_nf_status + }, + AdministrativeState.UNLOCKED.value: { + 'success': Subscription.update_sub_nf_status, + 'failed': Subscription.update_sub_nf_status + } +} |