diff options
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py')
-rwxr-xr-x | components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py | 83 |
1 files changed, 48 insertions, 35 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 354d6b8d..50eb122b 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -67,6 +67,8 @@ class AppConfig(metaclass=ThreadSafeSingleton): raise self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'), 'aaf_pass': conf['config'].get('aaf_password')} + self.enable_tls = conf['config'].get('enable_tls') + self.ca_cert_path = conf['config'].get('ca_cert_path') self.cert_path = conf['config'].get('cert_path') self.key_path = conf['config'].get('key_path') self.streams_subscribes = conf['config'].get('streams_subscribes') @@ -77,7 +79,7 @@ class AppConfig(metaclass=ThreadSafeSingleton): self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter) @mdc_handler - @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) + @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) def _get_pmsh_config(self, **kwargs): """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response is received, it retries after 2 seconds for 5 times before raising an exception. @@ -94,7 +96,7 @@ class AppConfig(metaclass=ThreadSafeSingleton): logger.info(f'Successfully fetched PMSH config from CBS: {config}') return config except Exception as err: - logger.error(f'Failed to get config from CBS: {err}') + logger.error(f'Failed to get config from CBS: {err}', exc_info=True) raise Exception def refresh_config(self): @@ -106,14 +108,16 @@ class AppConfig(metaclass=ThreadSafeSingleton): """ try: app_conf = self._get_pmsh_config() - except Exception: - logger.debug("Failed to refresh AppConfig data") - raise - self.subscription.administrativeState = \ - app_conf['policy']['subscription']['administrativeState'] - self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames'] - self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter']['swVersions'] - logger.info("AppConfig data has been refreshed") + if "INVALID JSON" in app_conf.values(): + raise ValueError('Failed to refresh AppConfig: INVALID JSON') + self.subscription.administrativeState = app_conf['policy']['subscription'][ + 'administrativeState'] + self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames'] + self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter'][ + 'swVersions'] + logger.info("AppConfig data has been refreshed") + except ValueError or Exception as e: + logger.error(e) def get_mr_sub(self, sub_name): """ @@ -129,9 +133,10 @@ class AppConfig(metaclass=ThreadSafeSingleton): KeyError: if the sub_name is not found. """ try: - return _MrSub(sub_name, self.aaf_creds, **self.streams_subscribes[sub_name]) + return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path, + self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name]) except KeyError as e: - logger.debug(e) + logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True) raise def get_mr_pub(self, pub_name): @@ -148,9 +153,10 @@ class AppConfig(metaclass=ThreadSafeSingleton): KeyError: if the sub_name is not found. """ try: - return _MrPub(pub_name, self.aaf_creds, **self.streams_publishes[pub_name]) + return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path, + self.enable_tls, self.cert_params, **self.streams_publishes[pub_name]) except KeyError as e: - logger.debug(e) + logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True) raise @property @@ -159,29 +165,35 @@ class AppConfig(metaclass=ThreadSafeSingleton): Returns the tls artifact paths. Returns: - cert_path, key_path: the path to tls cert and key. + cert_path, key_path (tuple): the path to tls cert and key. """ return self.cert_path, self.key_path class _DmaapMrClient: - def __init__(self, aaf_creds, **kwargs): + def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): """ A DMaaP Message Router utility class. Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only. Args: - aaf_creds: a dict of aaf secure credentials. + aaf_creds (dict): a dict of aaf secure credentials. + ca_cert_path (str): path to the ca certificate. + enable_tls (bool): TLS if True, else False + cert_params (tuple): client side (cert, key) tuple. **kwargs: a dict of streams_{subscribes|publishes} data. """ self.topic_url = kwargs.get('dmaap_info').get('topic_url') self.aaf_id = aaf_creds.get('aaf_id') self.aaf_pass = aaf_creds.get('aaf_pass') + self.ca_cert_path = ca_cert_path + self.enable_tls = enable_tls + self.cert_params = cert_params class _MrPub(_DmaapMrClient): - def __init__(self, pub_name, aaf_creds, **kwargs): + def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): self.pub_name = pub_name - super().__init__(aaf_creds, **kwargs) + super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs) @mdc_handler def publish_to_topic(self, event_json, **kwargs): @@ -189,7 +201,7 @@ class _MrPub(_DmaapMrClient): Publish the event to the DMaaP Message Router topic. Args: - event_json: the json data to be published. + event_json (dict): the json data to be published. Raises: Exception: if post request fails. @@ -200,36 +212,34 @@ class _MrPub(_DmaapMrClient): 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id'] } - logger.info(f'Attempting to publish event to {self.topic_url}') + logger.info(f'Publishing event to {self.topic_url}') response = session.post(self.topic_url, headers=headers, auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, - verify=False) + verify=(self.ca_cert_path if self.enable_tls else False)) response.raise_for_status() except Exception as e: - logger.error(f'Failed to publish message to MR topic: {e}') - raise + raise e def publish_subscription_event_data(self, subscription, xnf_name, app_conf): """ Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic. Args: - subscription: the `Subscription` <Subscription> object. - xnf_name: the xnf to include in the event. + subscription (Subscription): the `Subscription` <Subscription> object. + xnf_name (str): the xnf to include in the event. app_conf (AppConfig): the application configuration. """ try: subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf) self.publish_to_topic(subscription_event) except Exception as e: - logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}') - raise + logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True) class _MrSub(_DmaapMrClient): - def __init__(self, sub_name, aaf_creds, **kwargs): + def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): self.sub_name = sub_name - super().__init__(aaf_creds, **kwargs) + super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs) @mdc_handler def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs): @@ -237,10 +247,10 @@ class _MrSub(_DmaapMrClient): Returns the json data from the MrTopic. Args: - consumer_id: Within your subscribers group, a name that uniquely + consumer_id (str): Within your subscribers group, a name that uniquely identifies your subscribers process. - consumer_group: A name that uniquely identifies your subscribers. - timeout: The request timeout value in mSec. + consumer_group (str): A name that uniquely identifies your subscribers. + timeout (int): The request timeout value in mSec. Returns: list[str]: the json response from DMaaP Message Router topic. @@ -250,11 +260,14 @@ class _MrSub(_DmaapMrClient): headers = {'accept': 'application/json', 'content-type': 'application/json', 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']} - logger.debug(f'Fetching messages from MR topic: {self.topic_url}') + logger.info(f'Fetching messages from MR topic: {self.topic_url}') response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}' f'?timeout={timeout}', auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers, - verify=False) + verify=(self.ca_cert_path if self.enable_tls else False)) + if response.status_code == 503: + logger.error(f'MR Service is unavailable at present: {response.content}') + pass response.raise_for_status() if response.ok: return response.json() |