summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py')
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py73
1 files changed, 73 insertions, 0 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index b665691d..4a77543b 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -15,12 +15,17 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
+import json
+import threading
import uuid
import requests
from requests.auth import HTTPBasicAuth
+from tenacity import retry, wait_fixed, retry_if_exception_type
import mod.pmsh_logging as logger
+from mod.subscription import Subscription, SubNfState, AdministrativeState
+from mod.network_function import NetworkFunction
class AppConfig:
@@ -168,3 +173,71 @@ class _MrSub(_DmaapMrClient):
except Exception as e:
logger.debug(e)
return topic_data
+
+ @staticmethod
+ def _handle_response(subscription_name, administrative_state, nf_name, response_message):
+ """
+ Handles the response from Policy, updating the DB
+
+ Args:
+ subscription_name (str): The subscription name
+ administrative_state (str): The administrative state of the subscription
+ nf_name (str): The network function name
+ response_message (str): The message in the response regarding the state (success|failed)
+ """
+ logger.debug(f'Response from MR: Sub: {subscription_name} for '
+ f'NF: {nf_name} received, updating the DB')
+ try:
+ sub_nf_status = subscription_nf_states[administrative_state][response_message].value
+ policy_response_handle_functions[administrative_state][response_message](
+ subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name)
+ except Exception as err:
+ raise Exception(f'Error changing nf_sub status in the DB: {err}')
+
+ @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception))
+ def poll_policy_topic(self, subscription_name, app):
+ """
+ This method polls MR for response from policy. It checks whether the message is for the
+ relevant subscription and then handles the response
+
+ Args:
+ subscription_name (str): The subscription name
+ app (app): Needed to push context for the db
+ """
+ app.app_context().push()
+ administrative_state = Subscription.get(subscription_name).status
+ try:
+ response_data = self.get_from_topic('policy_response_consumer')
+ for data in response_data:
+ data = json.loads(data)
+ if data['status']['subscriptionName'] == subscription_name:
+ nf_name = data['status']['nfName']
+ response_message = data['status']['message']
+ self._handle_response(subscription_name, administrative_state,
+ nf_name, response_message)
+ threading.Timer(5, self.poll_policy_topic, [subscription_name, app]).start()
+ except Exception as err:
+ raise Exception(f'Error trying to poll MR: {err}')
+
+
+subscription_nf_states = {
+ AdministrativeState.LOCKED.value: {
+ 'success': SubNfState.CREATED,
+ 'failed': SubNfState.DELETE_FAILED
+ },
+ AdministrativeState.UNLOCKED.value: {
+ 'success': SubNfState.CREATED,
+ 'failed': SubNfState.CREATE_FAILED
+ }
+}
+
+policy_response_handle_functions = {
+ AdministrativeState.LOCKED.value: {
+ 'success': NetworkFunction.delete,
+ 'failed': Subscription.update_sub_nf_status
+ },
+ AdministrativeState.UNLOCKED.value: {
+ 'success': Subscription.update_sub_nf_status,
+ 'failed': Subscription.update_sub_nf_status
+ }
+}