From 0464347539f931847ab578ff935994aef0102352 Mon Sep 17 00:00:00 2001 From: efiacor Date: Mon, 22 Jun 2020 18:51:43 +0100 Subject: [PMSH] Add enable_tls boolean flag to config Signed-off-by: efiacor Change-Id: I19f71b690f743980eaa149c9b2c76fecb98a0120 Issue-ID: DCAEGEN2-2146 --- .../pmsh_service/mod/__init__.py | 7 +- .../pmsh_service/mod/aai_client.py | 20 +++--- .../pmsh_service/mod/aai_event_handler.py | 30 ++++---- .../pmsh_service/mod/exit_handler.py | 17 +++-- .../pmsh_service/mod/pmsh_utils.py | 83 +++++++++++++--------- .../pmsh_service/mod/policy_response_handler.py | 6 +- .../pmsh_service/mod/subscription.py | 39 +++++----- .../pmsh_service/mod/subscription_handler.py | 3 +- 8 files changed, 115 insertions(+), 90 deletions(-) (limited to 'components/pm-subscription-handler/pmsh_service/mod') diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py index efc61aae..4c86ccda 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py +++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py @@ -42,8 +42,11 @@ def _get_app(): def launch_api_server(app_config): connex_app = _get_app() connex_app.add_api('api/pmsh_swagger.yml') - connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), - ssl_context=(app_config.cert_path, app_config.key_path)) + if app_config.enable_tls: + connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), + ssl_context=app_config.cert_params) + else: + connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443')) def create_app(): diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index 2b92df41..bd052741 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -61,15 +61,14 @@ def _get_all_aai_nf_data(app_conf, **kwargs): nf_data = None try: session = requests.Session() - aai_endpoint = f'{_get_aai_service_url()}{"/aai/v16/query"}' + aai_endpoint = f'{_get_aai_service_url()}{"/aai/v19/query"}' logger.info('Fetching XNFs from AAI.') headers = {'accept': 'application/json', 'content-type': 'application/json', 'x-fromappid': 'dcae-pmsh', 'x-transactionid': kwargs['request_id'], 'InvocationID': kwargs['invocation_id'], - 'RequestID': kwargs['request_id'] - } + 'RequestID': kwargs['request_id']} json_data = """ {'start': ['network/pnfs', @@ -79,14 +78,16 @@ def _get_all_aai_nf_data(app_conf, **kwargs): response = session.put(aai_endpoint, headers=headers, auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'), app_conf.aaf_creds.get('aaf_pass')), - data=json_data, params=params, verify=False) + data=json_data, params=params, + verify=(app_conf.ca_cert_path if app_conf.enable_tls else False), + cert=(app_conf.cert_params if app_conf.enable_tls else None)) response.raise_for_status() if response.ok: nf_data = json.loads(response.text) logger.info('Successfully fetched XNFs from AAI') logger.debug(f'XNFs from AAI: {nf_data}') except Exception as e: - logger.error(f'Failed to get XNFs from AAI: {e}') + logger.error(f'Failed to get XNFs from AAI: {e}', exc_info=True) return nf_data @@ -98,14 +99,13 @@ def _get_aai_service_url(): str: the AAI k8s service URL. Raises: - KeyError: if AAI env vars not found. + KeyError: if AAI env var not found. """ try: - aai_service = environ['AAI_SERVICE_HOST'] aai_ssl_port = environ['AAI_SERVICE_PORT'] - return f'https://{aai_service}:{aai_ssl_port}' + return f'https://aai:{aai_ssl_port}' except KeyError as e: - logger.error(f'Failed to get AAI env vars: {e}') + logger.error(f'Failed to get AAI env var: {e}', exc_info=True) raise @@ -134,6 +134,6 @@ def _filter_nf_data(nf_data, nf_filter): nf_name=nf['properties'].get(name_identifier), orchestration_status=orchestration_status)) except KeyError as e: - logger.error(f'Failed to parse AAI data: {e}') + logger.error(f'Failed to parse AAI data: {e}', exc_info=True) raise return nf_set diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py index b3957bc5..f1e8cf27 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py @@ -19,8 +19,6 @@ import json from enum import Enum -from requests import HTTPError - from mod import logger from mod.network_function import NetworkFunction @@ -46,26 +44,26 @@ def process_aai_events(mr_sub, mr_pub, app, app_conf): app_conf (AppConfig): the application configuration. """ app.app_context().push() + logger.info('Polling MR for XNF AAI events.') try: - aai_events = mr_sub.get_from_topic('AAI-EVENT') - if len(aai_events) != 0: - for entry in aai_events: + aai_events = mr_sub.get_from_topic('dcae_pmsh_aai_event') + if aai_events is not None and len(aai_events) != 0: + aai_events = [json.loads(e) for e in aai_events] + xnf_events = [e for e in aai_events if e['event-header']['entity-type'] == ( + XNFType.PNF.value or XNFType.VNF.value)] + for entry in xnf_events: logger.debug(f'AAI-EVENT entry: {entry}') - entry = json.loads(entry) - event_header = entry['event-header'] - aai_xnf = entry['entity'] - action = event_header['action'] - entity_type = event_header['entity-type'] - xnf_name = aai_xnf['pnf-name'] if entity_type == XNFType.PNF.value else aai_xnf[ - 'vnf-name'] - new_status = aai_xnf['orchestration-status'] + aai_entity = entry['entity'] + action = entry['event-header']['action'] + entity_type = entry['event-header']['entity-type'] + xnf_name = aai_entity['pnf-name'] if entity_type == XNFType.PNF.value \ + else aai_entity['vnf-name'] + new_status = aai_entity['orchestration-status'] if app_conf.nf_filter.is_nf_in_filter(xnf_name, new_status): _process_event(action, new_status, xnf_name, mr_pub, app_conf) - except HTTPError as e: - logger.debug(f'Failed to fetch AAI-EVENT messages from MR {e}') except Exception as e: - logger.debug(f'Exception trying to process AAI events {e}') + logger.error(f'Failed to process AAI event: {e}', exc_info=True) def _process_event(action, new_status, xnf_name, mr_pub, app_conf): diff --git a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py index 01cb8dc3..3d02375d 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/exit_handler.py @@ -39,11 +39,14 @@ class ExitHandler: logger.debug(f'ExitHandler was called with signal number: {sig_num}.') current_sub = self.subscription_handler.current_sub if current_sub and current_sub.administrativeState == AdministrativeState.UNLOCKED.value: - for thread in self.periodic_tasks: - logger.debug(f'Cancelling periodic task with thread name: {thread.name}.') - thread.cancel() - current_sub.administrativeState = AdministrativeState.LOCKED.value - current_sub.process_subscription(current_sub.get_network_functions(), - self.subscription_handler.mr_pub, - self.subscription_handler.app_conf) + try: + for thread in self.periodic_tasks: + logger.debug(f'Cancelling periodic task with thread name: {thread.name}.') + thread.cancel() + current_sub.administrativeState = AdministrativeState.LOCKED.value + current_sub.process_subscription(current_sub.get_network_functions(), + self.subscription_handler.mr_pub, + self.subscription_handler.app_conf) + except Exception as e: + logger.error(f'Failed to shut down PMSH application: {e}', exc_info=True) ExitHandler.shutdown_signal_received = True diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 354d6b8d..50eb122b 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -67,6 +67,8 @@ class AppConfig(metaclass=ThreadSafeSingleton): raise self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'), 'aaf_pass': conf['config'].get('aaf_password')} + self.enable_tls = conf['config'].get('enable_tls') + self.ca_cert_path = conf['config'].get('ca_cert_path') self.cert_path = conf['config'].get('cert_path') self.key_path = conf['config'].get('key_path') self.streams_subscribes = conf['config'].get('streams_subscribes') @@ -77,7 +79,7 @@ class AppConfig(metaclass=ThreadSafeSingleton): self.nf_filter = NetworkFunctionFilter(**self.subscription.nfFilter) @mdc_handler - @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) + @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) def _get_pmsh_config(self, **kwargs): """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response is received, it retries after 2 seconds for 5 times before raising an exception. @@ -94,7 +96,7 @@ class AppConfig(metaclass=ThreadSafeSingleton): logger.info(f'Successfully fetched PMSH config from CBS: {config}') return config except Exception as err: - logger.error(f'Failed to get config from CBS: {err}') + logger.error(f'Failed to get config from CBS: {err}', exc_info=True) raise Exception def refresh_config(self): @@ -106,14 +108,16 @@ class AppConfig(metaclass=ThreadSafeSingleton): """ try: app_conf = self._get_pmsh_config() - except Exception: - logger.debug("Failed to refresh AppConfig data") - raise - self.subscription.administrativeState = \ - app_conf['policy']['subscription']['administrativeState'] - self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames'] - self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter']['swVersions'] - logger.info("AppConfig data has been refreshed") + if "INVALID JSON" in app_conf.values(): + raise ValueError('Failed to refresh AppConfig: INVALID JSON') + self.subscription.administrativeState = app_conf['policy']['subscription'][ + 'administrativeState'] + self.nf_filter.nf_names = app_conf['policy']['subscription']['nfFilter']['nfNames'] + self.nf_filter.nf_sw_version = app_conf['policy']['subscription']['nfFilter'][ + 'swVersions'] + logger.info("AppConfig data has been refreshed") + except ValueError or Exception as e: + logger.error(e) def get_mr_sub(self, sub_name): """ @@ -129,9 +133,10 @@ class AppConfig(metaclass=ThreadSafeSingleton): KeyError: if the sub_name is not found. """ try: - return _MrSub(sub_name, self.aaf_creds, **self.streams_subscribes[sub_name]) + return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path, + self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name]) except KeyError as e: - logger.debug(e) + logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True) raise def get_mr_pub(self, pub_name): @@ -148,9 +153,10 @@ class AppConfig(metaclass=ThreadSafeSingleton): KeyError: if the sub_name is not found. """ try: - return _MrPub(pub_name, self.aaf_creds, **self.streams_publishes[pub_name]) + return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path, + self.enable_tls, self.cert_params, **self.streams_publishes[pub_name]) except KeyError as e: - logger.debug(e) + logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True) raise @property @@ -159,29 +165,35 @@ class AppConfig(metaclass=ThreadSafeSingleton): Returns the tls artifact paths. Returns: - cert_path, key_path: the path to tls cert and key. + cert_path, key_path (tuple): the path to tls cert and key. """ return self.cert_path, self.key_path class _DmaapMrClient: - def __init__(self, aaf_creds, **kwargs): + def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): """ A DMaaP Message Router utility class. Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only. Args: - aaf_creds: a dict of aaf secure credentials. + aaf_creds (dict): a dict of aaf secure credentials. + ca_cert_path (str): path to the ca certificate. + enable_tls (bool): TLS if True, else False + cert_params (tuple): client side (cert, key) tuple. **kwargs: a dict of streams_{subscribes|publishes} data. """ self.topic_url = kwargs.get('dmaap_info').get('topic_url') self.aaf_id = aaf_creds.get('aaf_id') self.aaf_pass = aaf_creds.get('aaf_pass') + self.ca_cert_path = ca_cert_path + self.enable_tls = enable_tls + self.cert_params = cert_params class _MrPub(_DmaapMrClient): - def __init__(self, pub_name, aaf_creds, **kwargs): + def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): self.pub_name = pub_name - super().__init__(aaf_creds, **kwargs) + super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs) @mdc_handler def publish_to_topic(self, event_json, **kwargs): @@ -189,7 +201,7 @@ class _MrPub(_DmaapMrClient): Publish the event to the DMaaP Message Router topic. Args: - event_json: the json data to be published. + event_json (dict): the json data to be published. Raises: Exception: if post request fails. @@ -200,36 +212,34 @@ class _MrPub(_DmaapMrClient): 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id'] } - logger.info(f'Attempting to publish event to {self.topic_url}') + logger.info(f'Publishing event to {self.topic_url}') response = session.post(self.topic_url, headers=headers, auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, - verify=False) + verify=(self.ca_cert_path if self.enable_tls else False)) response.raise_for_status() except Exception as e: - logger.error(f'Failed to publish message to MR topic: {e}') - raise + raise e def publish_subscription_event_data(self, subscription, xnf_name, app_conf): """ Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic. Args: - subscription: the `Subscription` object. - xnf_name: the xnf to include in the event. + subscription (Subscription): the `Subscription` object. + xnf_name (str): the xnf to include in the event. app_conf (AppConfig): the application configuration. """ try: subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf) self.publish_to_topic(subscription_event) except Exception as e: - logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}') - raise + logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True) class _MrSub(_DmaapMrClient): - def __init__(self, sub_name, aaf_creds, **kwargs): + def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): self.sub_name = sub_name - super().__init__(aaf_creds, **kwargs) + super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs) @mdc_handler def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs): @@ -237,10 +247,10 @@ class _MrSub(_DmaapMrClient): Returns the json data from the MrTopic. Args: - consumer_id: Within your subscribers group, a name that uniquely + consumer_id (str): Within your subscribers group, a name that uniquely identifies your subscribers process. - consumer_group: A name that uniquely identifies your subscribers. - timeout: The request timeout value in mSec. + consumer_group (str): A name that uniquely identifies your subscribers. + timeout (int): The request timeout value in mSec. Returns: list[str]: the json response from DMaaP Message Router topic. @@ -250,11 +260,14 @@ class _MrSub(_DmaapMrClient): headers = {'accept': 'application/json', 'content-type': 'application/json', 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']} - logger.debug(f'Fetching messages from MR topic: {self.topic_url}') + logger.info(f'Fetching messages from MR topic: {self.topic_url}') response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}' f'?timeout={timeout}', auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers, - verify=False) + verify=(self.ca_cert_path if self.enable_tls else False)) + if response.status_code == 503: + logger.error(f'MR Service is unavailable at present: {response.content}') + pass response.raise_for_status() if response.ok: return response.json() diff --git a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py index 8a993828..73a5e7e8 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py @@ -47,9 +47,9 @@ class PolicyResponseHandler: """ self.app.app_context().push() administrative_state = self.app_conf.subscription.administrativeState - logger.info('Polling MR started for XNF activation/deactivation policy response events.') + logger.info('Polling MR for XNF activation/deactivation policy response events.') try: - response_data = self.mr_sub.get_from_topic('policy_response_consumer') + response_data = self.mr_sub.get_from_topic('dcae_pmsh_policy_cl_input') for data in response_data: data = json.loads(data) if data['status']['subscriptionName'] \ @@ -59,7 +59,7 @@ class PolicyResponseHandler: self._handle_response(self.app_conf.subscription.subscriptionName, administrative_state, nf_name, response_message) except Exception as err: - logger.error(f'Error trying to poll policy response topic on MR: {err}') + logger.error(f'Error trying to poll policy response topic on MR: {err}', exc_info=True) @staticmethod def _handle_response(subscription_name, administrative_state, nf_name, response_message): diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py index b623cbdf..dbcd7a5e 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py @@ -66,13 +66,17 @@ class Subscription: Returns: dict: the Subscription event to be published. """ - clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} - sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name, - 'changeType': 'DELETE' - if self.administrativeState == AdministrativeState.LOCKED.value - else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name, - 'subscription': clean_sub} - return sub_event + try: + clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'} + sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name, + 'changeType': 'DELETE' + if self.administrativeState == AdministrativeState.LOCKED.value + else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name, + 'subscription': clean_sub} + return sub_event + except Exception as e: + logger.error(f'Failed to prep Sub event for xNF {xnf_name}: {e}', exc_info=True) + raise def create(self): """ Creates a subscription database entry @@ -94,7 +98,8 @@ class Subscription: f' returning this subscription..') return existing_subscription except Exception as e: - logger.debug(f'Failed to create subscription {self.subscriptionName} in the DB: {e}') + logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}', + exc_info=True) def add_network_function_to_subscription(self, nf): """ Associates a network function to a Subscription @@ -120,8 +125,8 @@ class Subscription: db.session.add(current_sub) db.session.commit() except Exception as e: - logger.debug(f'Failed to add nf {nf.nf_name} to subscription ' - f'{current_sub.subscription_name}: {e}') + logger.error(f'Failed to add nf {nf.nf_name} to subscription ' + f'{current_sub.subscription_name}: {e}', exc_info=True) logger.debug(f'Subscription {current_sub.subscription_name} now contains these XNFs:' f'{Subscription.get_nf_names_per_sub(current_sub.subscription_name)}') @@ -175,7 +180,8 @@ class Subscription: db.session.commit() except Exception as e: - logger.debug(f'Failed to update status of subscription: {self.subscriptionName}: {e}') + logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}', + exc_info=True) def delete_subscription(self): """ Deletes a subscription and all its association from the database. A network function @@ -193,8 +199,8 @@ class Subscription: db.session.delete(subscription) db.session.commit() except Exception as e: - logger.debug(f'Failed to delete subscription: {self.subscriptionName} ' - f'and it\'s relations from the DB: {e}') + logger.error(f'Failed to delete subscription: {self.subscriptionName} ' + f'and it\'s relations from the DB: {e}', exc_info=True) def process_subscription(self, nfs, mr_pub, app_conf): action = 'Deactivate' @@ -211,7 +217,8 @@ class Subscription: mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf) logger.debug(f'Publishing Event to {action} ' f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}') - self.add_network_function_to_subscription(nf) + if action == 'Activate': + self.add_network_function_to_subscription(nf) self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name) except Exception as err: raise Exception(f'Error publishing activation event to MR: {err}') @@ -242,8 +249,8 @@ class Subscription: update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate') db.session.commit() except Exception as e: - logger.debug(f'Failed to update status of nf: {nf_name} for subscription: ' - f'{subscription_name}: {e}') + logger.error(f'Failed to update status of nf: {nf_name} for subscription: ' + f'{subscription_name}: {e}', exc_info=True) def _get_nf_models(self): nf_sub_relationships = NfSubRelationalModel.query.filter( diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py index 112f994b..74b6ac88 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py +++ b/components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py @@ -57,4 +57,5 @@ class SubscriptionHandler: self.aai_event_thread.cancel() except Exception as err: - logger.error(f'Error occurred during the activation/deactivation process {err}') + logger.error(f'Error occurred during the activation/deactivation process {err}', + exc_info=True) -- cgit 1.2.3-korg