diff options
author | 2021-08-25 13:53:27 +0530 | |
---|---|---|
committer | 2021-10-04 20:53:55 +0530 | |
commit | 053579b1a3a9d71fdc8fde5ed67600b453de083c (patch) | |
tree | cced45413b29daf5a718561f853ec880c3c718a3 /components/pm-subscription-handler/pmsh_service | |
parent | 62a3787b8c2f00ad4ba681517970db3201717088 (diff) |
[DCAEGEN2] PMSH AppConfig Update
- Simplified existing PMSH Appconfig
- Major version bump for J release
Issue-ID: DCAEGEN2-2814
Signed-off-by: ajay_dp001 <ajay.deep.singh@est.tech>
Change-Id: I8ed572ccc7385cfdf91e51a126622821c113c53d
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
5 files changed, 177 insertions, 27 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py index 5f78ca19..1024417e 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py +++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py @@ -17,8 +17,8 @@ # ============LICENSE_END===================================================== import logging as logging import os -import ssl import pathlib +import ssl from urllib.parse import quote from connexion import App @@ -46,9 +46,9 @@ def launch_api_server(app_config): if app_config.enable_tls: logger.info('Launching secure http API server') ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - ssl_ctx.load_cert_chain(app_config.cert_params[0], app_config.cert_params[1]) - connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), - ssl_options=ssl_ctx, server="tornado") + ssl_ctx.load_cert_chain(app_config.cert_path, app_config.key_path) + connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), ssl_options=ssl_ctx, + server="tornado") else: logger.info('Launching unsecure http API server') connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), server="tornado") diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index e53ea3a2..39adba46 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019-2020 Nordix Foundation. +# Copyright (C) 2019-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -74,7 +74,8 @@ def _get_all_aai_nf_data(app_conf): app_conf.aaf_creds.get('aaf_pass')), data=data, params=params, verify=(app_conf.ca_cert_path if app_conf.enable_tls else False), - cert=(app_conf.cert_params if app_conf.enable_tls else None)) + cert=((app_conf.cert_path, + app_conf.key_path) if app_conf.enable_tls else None)) response.raise_for_status() if response.ok: nf_data = json.loads(response.text) @@ -177,7 +178,8 @@ def get_aai_model_data(app_conf, model_invariant_id, model_version_id, nf_name): auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'), app_conf.aaf_creds.get('aaf_pass')), verify=(app_conf.ca_cert_path if app_conf.enable_tls else False), - cert=(app_conf.cert_params if app_conf.enable_tls else None)) + cert=((app_conf.cert_path, + app_conf.key_path) if app_conf.enable_tls else None)) response.raise_for_status() if response.ok: data = json.loads(response.text) diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py new file mode 100644 index 00000000..9c282ab7 --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py @@ -0,0 +1,158 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2021 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== + +"""This module represents PMSH application configuration + Singleton instance of configuration is created and stored, + Enum representation is used for Message Router topics. +""" + +from enum import Enum, unique + +import requests +from onap_dcae_cbs_docker_client.client import get_all +from requests.auth import HTTPBasicAuth +from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type + +from mod import logger +from mod.pmsh_utils import mdc_handler + + +@unique +class MRTopic(Enum): + """ Enum used to represent Message Router Topic""" + AAI_SUBSCRIBER = 'aai_subscriber' + POLICY_PM_PUBLISHER = 'policy_pm_publisher' + POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber' + + +class MetaSingleton(type): + """ Metaclass used to create singleton object by overriding __call__() method """ + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super().__call__(*args, **kwargs) + return cls._instances[cls] + + @classmethod + def get_cls_instance(mcs, cls_name): + return mcs._instances[cls_name] + + +class AppConfig(metaclass=MetaSingleton): + """ Object representation of the PMSH Application config. """ + + def __init__(self): + app_config = self._get_config() + self.key_path = app_config['config'].get('key_path') + self.cert_path = app_config['config'].get('cert_path') + self.ca_cert_path = app_config['config'].get('ca_cert_path') + self.enable_tls = app_config['config'].get('enable_tls') + self.aaf_id = app_config['config'].get('aaf_identity') + self.aaf_pass = app_config['config'].get('aaf_password') + self.streams_publishes = app_config['config'].get('streams_publishes') + self.streams_subscribes = app_config['config'].get('streams_subscribes') + # TODO: aaf_creds variable should be removed on code cleanup + self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass} + + @staticmethod + def get_instance(): + return AppConfig.get_cls_instance(AppConfig) + + @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), + retry=retry_if_exception_type(ValueError)) + def _get_config(self): + + """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response + is received, it retries after 2 seconds for 5 times before raising an exception. + + Returns: + dict: Dictionary representation of the the service configuration + + Raises: + Exception: If any error occurred pulling configuration from Config binding service. + """ + try: + logger.info('Attempting to fetch PMSH Configuration from CBS.') + config = get_all() + logger.info(f'Successfully fetched PMSH config from CBS: {config}') + return config + except Exception as e: + logger.error(f'Failed to get config from CBS: {e}', exc_info=True) + raise ValueError(e) + + @mdc_handler + def publish_to_topic(self, mr_topic, event_json, **kwargs): + """ + Publish the event to the DMaaP Message Router topic. + + Args: + mr_topic (enum) : Message Router topic to publish. + event_json (dict): the json data to be published. + + Raises: + Exception: if post request fails. + """ + try: + session = requests.Session() + topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url') + headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'], + 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']} + logger.info(f'Publishing event to MR topic: {topic_url}') + response = session.post(topic_url, headers=headers, + auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, + verify=(self.ca_cert_path if self.enable_tls else False)) + response.raise_for_status() + except Exception as e: + raise e + + @mdc_handler + def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000, + **kwargs): + """ + Returns the json data from the MrTopic. + + Args: + mr_topic (enum) : Message Router topic to subscribe. + consumer_id (str): Within your subscribers group, a name that uniquely + identifies your subscribers process. + consumer_group (str): A name that uniquely identifies your subscribers. + timeout (int): The request timeout value in mSec. + + Returns: + list[str]: the json response from DMaaP Message Router topic. + """ + try: + session = requests.Session() + topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url') + headers = {'accept': 'application/json', 'content-type': 'application/json', + 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']} + logger.info(f'Fetching messages from MR topic: {topic_url}') + response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}' + f'?timeout={timeout}', + auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers, + verify=(self.ca_cert_path if self.enable_tls else False)) + if response.status_code == 503: + logger.error(f'MR Service is unavailable at present: {response.content}') + pass + response.raise_for_status() + if response.ok: + return response.json() + except Exception as e: + logger.error(f'Failed to fetch message from MR: {e}', exc_info=True) + raise diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 9ddff315..26ada11b 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -18,23 +18,25 @@ import json import os import uuid +from functools import wraps from json import JSONDecodeError from os import getenv from threading import Timer import requests +from jsonschema import validate, ValidationError from onap_dcae_cbs_docker_client.client import get_all from onaplogging.mdcContext import MDC from requests.auth import HTTPBasicAuth from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type -from jsonschema import validate, ValidationError from mod import logger from mod.subscription import Subscription -def mdc_handler(function): - def decorator(*args, **kwargs): +def mdc_handler(func): + @wraps(func) + def wrapper(*args, **kwargs): request_id = str(uuid.uuid1()) invocation_id = str(uuid.uuid1()) MDC.put('ServiceName', getenv('HOSTNAME')) @@ -43,23 +45,9 @@ def mdc_handler(function): kwargs['request_id'] = request_id kwargs['invocation_id'] = invocation_id - return function(*args, **kwargs) - - return decorator - - -class MySingleton(object): - instances = {} + return func(*args, **kwargs) - def __new__(cls, clz=None): - if clz is None: - if cls.__name__ not in MySingleton.instances: - MySingleton.instances[cls.__name__] = \ - object.__new__(cls) - return MySingleton.instances[cls.__name__] - MySingleton.instances[clz.__name__] = clz() - MySingleton.first = clz - return type(clz.__name__, (MySingleton,), dict(clz.__dict__)) + return wrapper def _load_sub_schema_from_file(): diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index 4f2ca4a1..0b6544b5 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -20,6 +20,7 @@ from signal import signal, SIGTERM from mod import db, create_app, launch_api_server, logger from mod.exit_handler import ExitHandler +from mod.pmsh_config import AppConfig as NewAppConfig from mod.pmsh_utils import AppConfig, PeriodicTask from mod.policy_response_handler import PolicyResponseHandler from mod.subscription_handler import SubscriptionHandler @@ -32,6 +33,7 @@ def main(): app.app_context().push() db.create_all(app=app) app_conf = AppConfig() + pmsh_app_conf = NewAppConfig() policy_mr_pub = app_conf.get_mr_pub('policy_pm_publisher') policy_mr_sub = app_conf.get_mr_sub('policy_pm_subscriber') aai_event_mr_sub = app_conf.get_mr_sub('aai_subscriber') @@ -54,7 +56,7 @@ def main(): signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks, app_conf=app_conf, subscription_handler=subscription_handler)) - launch_api_server(app_conf) + launch_api_server(pmsh_app_conf) except Exception as e: logger.error(f'Failed to initialise PMSH: {e}', exc_info=True) |