diff options
author | Tony Hansen <tony@att.com> | 2022-03-07 20:34:07 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2022-03-07 20:34:07 +0000 |
commit | 6c9c8cc2a1a289ef0b4172d387376d0367549fe3 (patch) | |
tree | 86d10ee475a012545bcfc8b256f928f1996dc6de /components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py | |
parent | f67adad42f2c857ff76b52df39f0beb5c2cabee4 (diff) | |
parent | 5a2c43f2add2c6d4af8331a40b174684eac11b34 (diff) |
Merge "[PMSH] Cleaning up old App Config, subscription handler and it's subsequent calls"
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py')
-rwxr-xr-x | components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py | 313 |
1 files changed, 0 insertions, 313 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py deleted file mode 100755 index d1790bbb..00000000 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ /dev/null @@ -1,313 +0,0 @@ -# ============LICENSE_START=================================================== -# Copyright (C) 2019-2021 Nordix Foundation. -# ============================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END===================================================== -import json -import os -import uuid -from functools import wraps -from json import JSONDecodeError -from os import getenv -from threading import Timer - -import requests -from jsonschema import validate, ValidationError -from onap_dcae_cbs_docker_client.client import get_all -from onaplogging.mdcContext import MDC -from requests.auth import HTTPBasicAuth -from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type - -from mod import logger -from mod.subscription import Subscription - - -def mdc_handler(func): - @wraps(func) - def wrapper(*args, **kwargs): - request_id = str(uuid.uuid1()) - invocation_id = str(uuid.uuid1()) - MDC.put('ServiceName', getenv('HOSTNAME')) - MDC.put('RequestID', request_id) - MDC.put('InvocationID', invocation_id) - - kwargs['request_id'] = request_id - kwargs['invocation_id'] = invocation_id - return func(*args, **kwargs) - - return wrapper - - -def _load_sub_schema_from_file(): - try: - with open(os.path.join(os.path.dirname(__file__), 'sub_schema.json')) as sub: - return json.load(sub) - except OSError as err: - logger.error(f'Failed to read sub schema file: {err}', exc_info=True) - except JSONDecodeError as json_err: - logger.error(f'sub schema file is not a valid JSON file: {json_err}', exc_info=True) - - -class AppConfig: - INSTANCE = None - - def __init__(self): - conf = self._get_pmsh_config() - self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'), - 'aaf_pass': conf['config'].get('aaf_password')} - self.enable_tls = conf['config'].get('enable_tls') - self.ca_cert_path = conf['config'].get('ca_cert_path') - self.cert_path = conf['config'].get('cert_path') - self.key_path = conf['config'].get('key_path') - self.streams_subscribes = conf['config'].get('streams_subscribes') - self.streams_publishes = conf['config'].get('streams_publishes') - self.operational_policy_name = conf['config'].get('operational_policy_name') - self.control_loop_name = conf['config'].get('control_loop_name') - self.sub_schema = _load_sub_schema_from_file() - self.subscription = Subscription(self.control_loop_name, - self.operational_policy_name, - **conf['config']['pmsh_policy']['subscription']) - self.nf_filter = None - - def __new__(cls, *args, **kwargs): - if AppConfig.INSTANCE is None: - AppConfig.INSTANCE = super().__new__(cls, *args, **kwargs) - return AppConfig.INSTANCE - - @mdc_handler - @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), - retry=retry_if_exception_type(ValueError)) - def _get_pmsh_config(self, **kwargs): - """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response - is received, it retries after 2 seconds for 5 times before raising an exception. - - Returns: - dict: Dictionary representation of the the service configuration - - Raises: - Exception: If any error occurred pulling configuration from Config binding service. - """ - try: - logger.info('Attempting to fetch PMSH Configuration from CBS.') - config = get_all() - logger.info(f'Successfully fetched PMSH config from CBS: {config}') - return config - except Exception as e: - logger.error(f'Failed to get config from CBS: {e}', exc_info=True) - raise ValueError(e) - - def validate_sub_schema(self): - """ - Validates schema of PMSH subscription - - Raises: - ValidationError: If the PMSH subscription schema is invalid - """ - sub_data = self.subscription.__dict__ - validate(instance=sub_data, schema=self.sub_schema) - nf_filter = sub_data["nfFilter"] - if not [filter_name for filter_name, val in nf_filter.items() if len(val) > 0]: - raise ValidationError("At least one filter within nfFilter must not be empty") - logger.debug("Subscription schema is valid.") - - def refresh_config(self): - """ - Update the relevant attributes of the AppConfig object. - - Raises: - Exception: if cbs request fails. - """ - try: - app_conf = self._get_pmsh_config() - self.operational_policy_name = app_conf['config'].get('operational_policy_name') - self.control_loop_name = app_conf['config'].get('control_loop_name') - self.subscription = Subscription(self.control_loop_name, - self.operational_policy_name, - **app_conf['config']['pmsh_policy']['subscription']) - logger.info("AppConfig data has been refreshed") - except Exception: - logger.error('Failed to refresh PMSH AppConfig') - raise - - def get_mr_sub(self, sub_name): - """ - Returns the MrSub object requested. - - Args: - sub_name: the key of the subscriber object. - - Returns: - MrSub: an Instance of an `MrSub` <MrSub> Object. - - Raises: - KeyError: if the sub_name is not found. - """ - try: - return _MrSub(sub_name, self.aaf_creds, self.ca_cert_path, - self.enable_tls, self.cert_params, **self.streams_subscribes[sub_name]) - except KeyError as e: - logger.error(f'Failed to get MrSub {sub_name}: {e}', exc_info=True) - raise - - def get_mr_pub(self, pub_name): - """ - Returns the MrPub object requested. - - Args: - pub_name: the key of the publisher object. - - Returns: - MrPub: an Instance of an `MrPub` <MrPub> Object. - - Raises: - KeyError: if the sub_name is not found. - """ - try: - return _MrPub(pub_name, self.aaf_creds, self.ca_cert_path, - self.enable_tls, self.cert_params, **self.streams_publishes[pub_name]) - except KeyError as e: - logger.error(f'Failed to get MrPub {pub_name}: {e}', exc_info=True) - raise - - @property - def cert_params(self): - """ - Returns the tls artifact paths. - - Returns: - cert_path, key_path (tuple): the path to tls cert and key. - """ - return self.cert_path, self.key_path - - -class _DmaapMrClient: - def __init__(self, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): - """ - A DMaaP Message Router utility class. - Sub classes should be invoked via the AppConfig.get_mr_{pub|sub} only. - Args: - aaf_creds (dict): a dict of aaf secure credentials. - ca_cert_path (str): path to the ca certificate. - enable_tls (bool): TLS if True, else False - cert_params (tuple): client side (cert, key) tuple. - **kwargs: a dict of streams_{subscribes|publishes} data. - """ - self.topic_url = kwargs.get('dmaap_info').get('topic_url') - self.aaf_id = aaf_creds.get('aaf_id') - self.aaf_pass = aaf_creds.get('aaf_pass') - self.ca_cert_path = ca_cert_path - self.enable_tls = enable_tls - self.cert_params = cert_params - - -class _MrPub(_DmaapMrClient): - def __init__(self, pub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): - self.pub_name = pub_name - super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs) - - @mdc_handler - def publish_to_topic(self, event_json, **kwargs): - """ - Publish the event to the DMaaP Message Router topic. - - Args: - event_json (dict): the json data to be published. - - Raises: - Exception: if post request fails. - """ - try: - session = requests.Session() - headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'], - 'InvocationID': kwargs['invocation_id'], - 'RequestID': kwargs['request_id'] - } - logger.info(f'Publishing event to {self.topic_url}') - response = session.post(self.topic_url, headers=headers, - auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, - verify=(self.ca_cert_path if self.enable_tls else False)) - response.raise_for_status() - except Exception as e: - raise e - - def publish_subscription_event_data(self, subscription, nf): - """ - Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic. - - Args: - subscription (Subscription): the `Subscription` <Subscription> object. - nf (NetworkFunction): the NetworkFunction to include in the event. - """ - try: - subscription_event = subscription.prepare_subscription_event(nf) - logger.debug(f'Subscription event: {subscription_event}') - self.publish_to_topic(subscription_event) - except Exception as e: - logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True) - raise e - - -class _MrSub(_DmaapMrClient): - def __init__(self, sub_name, aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs): - self.sub_name = sub_name - super().__init__(aaf_creds, ca_cert_path, enable_tls, cert_params, **kwargs) - - @mdc_handler - def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs): - """ - Returns the json data from the MrTopic. - - Args: - consumer_id (str): Within your subscribers group, a name that uniquely - identifies your subscribers process. - consumer_group (str): A name that uniquely identifies your subscribers. - timeout (int): The request timeout value in mSec. - - Returns: - list[str]: the json response from DMaaP Message Router topic. - """ - try: - session = requests.Session() - headers = {'accept': 'application/json', 'content-type': 'application/json', - 'InvocationID': kwargs['invocation_id'], - 'RequestID': kwargs['request_id']} - logger.info(f'Fetching messages from MR topic: {self.topic_url}') - response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}' - f'?timeout={timeout}', - auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers, - verify=(self.ca_cert_path if self.enable_tls else False)) - if response.status_code == 503: - logger.error(f'MR Service is unavailable at present: {response.content}') - pass - response.raise_for_status() - if response.ok: - return response.json() - except Exception as e: - logger.error(f'Failed to fetch message from MR: {e}', exc_info=True) - raise - - -class PeriodicTask(Timer): - """ - See :class:`Timer`. - """ - - def run(self): - self.function(*self.args, **self.kwargs) - while not self.finished.wait(self.interval): - try: - self.function(*self.args, **self.kwargs) - except Exception as e: - logger.error(f'Exception in thread: {self.name}: {e}', exc_info=True) |