summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py')
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py83
1 files changed, 48 insertions, 35 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index 354d6b8d..50eb122b 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -67,6 +67,8 @@ class AppConfig(metaclass=ThreadSafeSingleton):
raise
self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'),
'aaf_pass': conf['config'].get('aaf_password')}
+ self.enable_tls = conf['config'].get('enable_tls')
+ self.ca_cert_path = conf['config'].get('ca_cert_path')
self.cert_path = conf['config'].get('cert_path')
self.key_path = conf['config'].get('key_path')
self.streams_subscribes = conf['config'].get('streams_subscribes')
@@ -77,7 +79,7 @@ class AppConfig(metaclass=ThreadSafeSingleton):
self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter)
@mdc_handler
- @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
+ @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
def _get_pmsh_config(self, **kwargs):
""" Retrieves PMSH's configuration from Config binding service. If a non-2xx response
is received, it retries after 2 seconds for 5 times before raising an exception.
@@ -94,7 +96,7 @@ class AppConfig(metaclass=ThreadSafeSingleton):
logger.info(f'Successfully fetched PMSH config from CBS: {config}')
return config
except Exception as err:
- logger.error(f'Failed to get config from CBS: {err}')
+ logger.error(f'Failed to get config from CBS: {err}', exc_info=True)
raise Exception
def refresh_config(self):
@@ -106,14 +108,16 @@ class AppConfig(metaclass=ThreadSafeSingleton):
"""
try:
app_conf = self._get_pmsh_config()
- except Exception:
- logger.debug("Failed to refresh AppConfig data")
- raise
- self.subscription.administrativeState = \
- app_conf['policy']['subscription']['administrativeState']
- self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames']
- self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter']['swVersions']
- logger.info("AppConfig data has been refreshed")
+ if "INVALID JSON" in app_conf.values():
+ raise ValueError('Failed to refresh AppConfig: INVALID JSON')
+ self.subscription.administrativeState = app_conf['policy']['subscription'][
+ 'administrativeState']
+ self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames']
+ self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter'][
+ 'swVersions']
+ logger.info("AppConfig data has been refreshed")
+ except ValueError or Exception as e:
+ logger.error(e)
def get_mr_sub(self, sub_name):
"""
@@ -129,9 +133,10 @@ class AppConfig(metaclass=ThreadSafeSingleton):
KeyError: if the sub_name is not found.
"""
try:
- return _MrSub(sub_name, self.aaf_creds, **self.streams_subscribes[sub_name])
+ return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path,
+ self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name])
except KeyError as e:
- logger.debug(e)
+ logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True)
raise
def get_mr_pub(self, pub_name):
@@ -148,9 +153,10 @@ class AppConfig(metaclass=ThreadSafeSingleton):
KeyError: if the sub_name is not found.
"""
try:
- return _MrPub(pub_name, self.aaf_creds, **self.streams_publishes[pub_name])
+ return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path,
+ self.enable_tls, self.cert_params, **self.streams_publishes[pub_name])
except KeyError as e:
- logger.debug(e)
+ logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True)
raise
@property
@@ -159,29 +165,35 @@ class AppConfig(metaclass=ThreadSafeSingleton):
Returns the tls artifact paths.
Returns:
- cert_path, key_path: the path to tls cert and key.
+ cert_path, key_path (tuple): the path to tls cert and key.
"""
return self.cert_path, self.key_path
class _DmaapMrClient:
- def __init__(self, aaf_creds, **kwargs):
+ def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
"""
A DMaaP Message Router utility class.
Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only.
Args:
- aaf_creds: a dict of aaf secure credentials.
+ aaf_creds (dict): a dict of aaf secure credentials.
+ ca_cert_path (str): path to the ca certificate.
+ enable_tls (bool): TLS if True, else False
+ cert_params (tuple): client side (cert, key) tuple.
**kwargs: a dict of streams_{subscribes|publishes} data.
"""
self.topic_url = kwargs.get('dmaap_info').get('topic_url')
self.aaf_id = aaf_creds.get('aaf_id')
self.aaf_pass = aaf_creds.get('aaf_pass')
+ self.ca_cert_path = ca_cert_path
+ self.enable_tls = enable_tls
+ self.cert_params = cert_params
class _MrPub(_DmaapMrClient):
- def __init__(self, pub_name, aaf_creds, **kwargs):
+ def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
self.pub_name = pub_name
- super().__init__(aaf_creds, **kwargs)
+ super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
@mdc_handler
def publish_to_topic(self, event_json, **kwargs):
@@ -189,7 +201,7 @@ class _MrPub(_DmaapMrClient):
Publish the event to the DMaaP Message Router topic.
Args:
- event_json: the json data to be published.
+ event_json (dict): the json data to be published.
Raises:
Exception: if post request fails.
@@ -200,36 +212,34 @@ class _MrPub(_DmaapMrClient):
'InvocationID': kwargs['invocation_id'],
'RequestID': kwargs['request_id']
}
- logger.info(f'Attempting to publish event to {self.topic_url}')
+ logger.info(f'Publishing event to {self.topic_url}')
response = session.post(self.topic_url, headers=headers,
auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
- verify=False)
+ verify=(self.ca_cert_path if self.enable_tls else False))
response.raise_for_status()
except Exception as e:
- logger.error(f'Failed to publish message to MR topic: {e}')
- raise
+ raise e
def publish_subscription_event_data(self, subscription, xnf_name, app_conf):
"""
Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
Args:
- subscription: the `Subscription` <Subscription> object.
- xnf_name: the xnf to include in the event.
+ subscription (Subscription): the `Subscription` <Subscription> object.
+ xnf_name (str): the xnf to include in the event.
app_conf (AppConfig): the application configuration.
"""
try:
subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf)
self.publish_to_topic(subscription_event)
except Exception as e:
- logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
- raise
+ logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True)
class _MrSub(_DmaapMrClient):
- def __init__(self, sub_name, aaf_creds, **kwargs):
+ def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs):
self.sub_name = sub_name
- super().__init__(aaf_creds, **kwargs)
+ super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs)
@mdc_handler
def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs):
@@ -237,10 +247,10 @@ class _MrSub(_DmaapMrClient):
Returns the json data from the MrTopic.
Args:
- consumer_id: Within your subscribers group, a name that uniquely
+ consumer_id (str): Within your subscribers group, a name that uniquely
identifies your subscribers process.
- consumer_group: A name that uniquely identifies your subscribers.
- timeout: The request timeout value in mSec.
+ consumer_group (str): A name that uniquely identifies your subscribers.
+ timeout (int): The request timeout value in mSec.
Returns:
list[str]: the json response from DMaaP Message Router topic.
@@ -250,11 +260,14 @@ class _MrSub(_DmaapMrClient):
headers = {'accept': 'application/json', 'content-type': 'application/json',
'InvocationID': kwargs['invocation_id'],
'RequestID': kwargs['request_id']}
- logger.debug(f'Fetching messages from MR topic: {self.topic_url}')
+ logger.info(f'Fetching messages from MR topic: {self.topic_url}')
response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
f'?timeout={timeout}',
auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
- verify=False)
+ verify=(self.ca_cert_path if self.enable_tls else False))
+ if response.status_code == 503:
+ logger.error(f'MR Service is unavailable at present: {response.content}')
+ pass
response.raise_for_status()
if response.ok:
return response.json()