diff options
author | ajay_dp001 <ajay.deep.singh@est.tech> | 2021-08-25 13:53:27 +0530 |
---|---|---|
committer | ajay_dp001 <ajay.deep.singh@est.tech> | 2021-10-04 20:53:55 +0530 |
commit | 053579b1a3a9d71fdc8fde5ed67600b453de083c (patch) | |
tree | cced45413b29daf5a718561f853ec880c3c718a3 /components/pm-subscription-handler | |
parent | 62a3787b8c2f00ad4ba681517970db3201717088 (diff) |
[DCAEGEN2] PMSH AppConfig Update
- Simplified existing PMSH Appconfig
- Major version bump for J release
Issue-ID: DCAEGEN2-2814
Signed-off-by: ajay_dp001 <ajay.deep.singh@est.tech>
Change-Id: I8ed572ccc7385cfdf91e51a126622821c113c53d
Diffstat (limited to 'components/pm-subscription-handler')
11 files changed, 283 insertions, 33 deletions
diff --git a/components/pm-subscription-handler/Changelog.md b/components/pm-subscription-handler/Changelog.md index bd5929cd..3112ac59 100755 --- a/components/pm-subscription-handler/Changelog.md +++ b/components/pm-subscription-handler/Changelog.md @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [2.0.0] +### Changed +* Updated PMSH app configuration, simplified existing config (DCAEGEN2-2814) ## [1.3.2] ### Changed diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py index 5f78ca19..1024417e 100644 --- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py +++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py @@ -17,8 +17,8 @@ # ============LICENSE_END===================================================== import logging as logging import os -import ssl import pathlib +import ssl from urllib.parse import quote from connexion import App @@ -46,9 +46,9 @@ def launch_api_server(app_config): if app_config.enable_tls: logger.info('Launching secure http API server') ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - ssl_ctx.load_cert_chain(app_config.cert_params[0], app_config.cert_params[1]) - connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), - ssl_options=ssl_ctx, server="tornado") + ssl_ctx.load_cert_chain(app_config.cert_path, app_config.key_path) + connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), ssl_options=ssl_ctx, + server="tornado") else: logger.info('Launching unsecure http API server') connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), server="tornado") diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py index e53ea3a2..39adba46 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py +++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2019-2020 Nordix Foundation. +# Copyright (C) 2019-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -74,7 +74,8 @@ def _get_all_aai_nf_data(app_conf): app_conf.aaf_creds.get('aaf_pass')), data=data, params=params, verify=(app_conf.ca_cert_path if app_conf.enable_tls else False), - cert=(app_conf.cert_params if app_conf.enable_tls else None)) + cert=((app_conf.cert_path, + app_conf.key_path) if app_conf.enable_tls else None)) response.raise_for_status() if response.ok: nf_data = json.loads(response.text) @@ -177,7 +178,8 @@ def get_aai_model_data(app_conf, model_invariant_id, model_version_id, nf_name): auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'), app_conf.aaf_creds.get('aaf_pass')), verify=(app_conf.ca_cert_path if app_conf.enable_tls else False), - cert=(app_conf.cert_params if app_conf.enable_tls else None)) + cert=((app_conf.cert_path, + app_conf.key_path) if app_conf.enable_tls else None)) response.raise_for_status() if response.ok: data = json.loads(response.text) diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py new file mode 100644 index 00000000..9c282ab7 --- /dev/null +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py @@ -0,0 +1,158 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2021 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== + +"""This module represents PMSH application configuration + Singleton instance of configuration is created and stored, + Enum representation is used for Message Router topics. +""" + +from enum import Enum, unique + +import requests +from onap_dcae_cbs_docker_client.client import get_all +from requests.auth import HTTPBasicAuth +from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type + +from mod import logger +from mod.pmsh_utils import mdc_handler + + +@unique +class MRTopic(Enum): + """ Enum used to represent Message Router Topic""" + AAI_SUBSCRIBER = 'aai_subscriber' + POLICY_PM_PUBLISHER = 'policy_pm_publisher' + POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber' + + +class MetaSingleton(type): + """ Metaclass used to create singleton object by overriding __call__() method """ + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super().__call__(*args, **kwargs) + return cls._instances[cls] + + @classmethod + def get_cls_instance(mcs, cls_name): + return mcs._instances[cls_name] + + +class AppConfig(metaclass=MetaSingleton): + """ Object representation of the PMSH Application config. """ + + def __init__(self): + app_config = self._get_config() + self.key_path = app_config['config'].get('key_path') + self.cert_path = app_config['config'].get('cert_path') + self.ca_cert_path = app_config['config'].get('ca_cert_path') + self.enable_tls = app_config['config'].get('enable_tls') + self.aaf_id = app_config['config'].get('aaf_identity') + self.aaf_pass = app_config['config'].get('aaf_password') + self.streams_publishes = app_config['config'].get('streams_publishes') + self.streams_subscribes = app_config['config'].get('streams_subscribes') + # TODO: aaf_creds variable should be removed on code cleanup + self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass} + + @staticmethod + def get_instance(): + return AppConfig.get_cls_instance(AppConfig) + + @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), + retry=retry_if_exception_type(ValueError)) + def _get_config(self): + + """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response + is received, it retries after 2 seconds for 5 times before raising an exception. + + Returns: + dict: Dictionary representation of the the service configuration + + Raises: + Exception: If any error occurred pulling configuration from Config binding service. + """ + try: + logger.info('Attempting to fetch PMSH Configuration from CBS.') + config = get_all() + logger.info(f'Successfully fetched PMSH config from CBS: {config}') + return config + except Exception as e: + logger.error(f'Failed to get config from CBS: {e}', exc_info=True) + raise ValueError(e) + + @mdc_handler + def publish_to_topic(self, mr_topic, event_json, **kwargs): + """ + Publish the event to the DMaaP Message Router topic. + + Args: + mr_topic (enum) : Message Router topic to publish. + event_json (dict): the json data to be published. + + Raises: + Exception: if post request fails. + """ + try: + session = requests.Session() + topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url') + headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'], + 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']} + logger.info(f'Publishing event to MR topic: {topic_url}') + response = session.post(topic_url, headers=headers, + auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, + verify=(self.ca_cert_path if self.enable_tls else False)) + response.raise_for_status() + except Exception as e: + raise e + + @mdc_handler + def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000, + **kwargs): + """ + Returns the json data from the MrTopic. + + Args: + mr_topic (enum) : Message Router topic to subscribe. + consumer_id (str): Within your subscribers group, a name that uniquely + identifies your subscribers process. + consumer_group (str): A name that uniquely identifies your subscribers. + timeout (int): The request timeout value in mSec. + + Returns: + list[str]: the json response from DMaaP Message Router topic. + """ + try: + session = requests.Session() + topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url') + headers = {'accept': 'application/json', 'content-type': 'application/json', + 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']} + logger.info(f'Fetching messages from MR topic: {topic_url}') + response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}' + f'?timeout={timeout}', + auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers, + verify=(self.ca_cert_path if self.enable_tls else False)) + if response.status_code == 503: + logger.error(f'MR Service is unavailable at present: {response.content}') + pass + response.raise_for_status() + if response.ok: + return response.json() + except Exception as e: + logger.error(f'Failed to fetch message from MR: {e}', exc_info=True) + raise diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 9ddff315..26ada11b 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -18,23 +18,25 @@ import json import os import uuid +from functools import wraps from json import JSONDecodeError from os import getenv from threading import Timer import requests +from jsonschema import validate, ValidationError from onap_dcae_cbs_docker_client.client import get_all from onaplogging.mdcContext import MDC from requests.auth import HTTPBasicAuth from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type -from jsonschema import validate, ValidationError from mod import logger from mod.subscription import Subscription -def mdc_handler(function): - def decorator(*args, **kwargs): +def mdc_handler(func): + @wraps(func) + def wrapper(*args, **kwargs): request_id = str(uuid.uuid1()) invocation_id = str(uuid.uuid1()) MDC.put('ServiceName', getenv('HOSTNAME')) @@ -43,23 +45,9 @@ def mdc_handler(function): kwargs['request_id'] = request_id kwargs['invocation_id'] = invocation_id - return function(*args, **kwargs) - - return decorator - - -class MySingleton(object): - instances = {} + return func(*args, **kwargs) - def __new__(cls, clz=None): - if clz is None: - if cls.__name__ not in MySingleton.instances: - MySingleton.instances[cls.__name__] = \ - object.__new__(cls) - return MySingleton.instances[cls.__name__] - MySingleton.instances[clz.__name__] = clz() - MySingleton.first = clz - return type(clz.__name__, (MySingleton,), dict(clz.__dict__)) + return wrapper def _load_sub_schema_from_file(): diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py index 4f2ca4a1..0b6544b5 100755 --- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py +++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py @@ -20,6 +20,7 @@ from signal import signal, SIGTERM from mod import db, create_app, launch_api_server, logger from mod.exit_handler import ExitHandler +from mod.pmsh_config import AppConfig as NewAppConfig from mod.pmsh_utils import AppConfig, PeriodicTask from mod.policy_response_handler import PolicyResponseHandler from mod.subscription_handler import SubscriptionHandler @@ -32,6 +33,7 @@ def main(): app.app_context().push() db.create_all(app=app) app_conf = AppConfig() + pmsh_app_conf = NewAppConfig() policy_mr_pub = app_conf.get_mr_pub('policy_pm_publisher') policy_mr_sub = app_conf.get_mr_sub('policy_pm_subscriber') aai_event_mr_sub = app_conf.get_mr_sub('aai_subscriber') @@ -54,7 +56,7 @@ def main(): signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks, app_conf=app_conf, subscription_handler=subscription_handler)) - launch_api_server(app_conf) + launch_api_server(pmsh_app_conf) except Exception as e: logger.error(f'Failed to initialise PMSH: {e}', exc_info=True) diff --git a/components/pm-subscription-handler/pom.xml b/components/pm-subscription-handler/pom.xml index a0b58f4d..89baa288 100644 --- a/components/pm-subscription-handler/pom.xml +++ b/components/pm-subscription-handler/pom.xml @@ -32,7 +32,7 @@ <groupId>org.onap.dcaegen2.services</groupId> <artifactId>pmsh</artifactId> <name>dcaegen2-services-pm-subscription-handler</name> - <version>1.3.2-SNAPSHOT</version> + <version>2.0.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <sonar.sources>.</sonar.sources> diff --git a/components/pm-subscription-handler/setup.py b/components/pm-subscription-handler/setup.py index 2b8d24a9..d5d60401 100644 --- a/components/pm-subscription-handler/setup.py +++ b/components/pm-subscription-handler/setup.py @@ -22,7 +22,7 @@ from setuptools import setup, find_packages setup( name="pm_subscription_handler", - version="1.3.2", + version="2.0.0", packages=find_packages(), author="lego@est.tech", author_email="lego@est.tech", diff --git a/components/pm-subscription-handler/tests/base_setup.py b/components/pm-subscription-handler/tests/base_setup.py index 9e12f96e..e422ceac 100755 --- a/components/pm-subscription-handler/tests/base_setup.py +++ b/components/pm-subscription-handler/tests/base_setup.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2020 Nordix Foundation. +# Copyright (C) 2020-2021 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ from unittest.mock import patch, MagicMock from mod import create_app, db from mod.network_function import NetworkFunctionFilter from mod.pmsh_utils import AppConfig +from mod.pmsh_config import AppConfig as NewAppConfig def get_pmsh_config(file_path='data/cbs_data_1.json'): @@ -51,6 +52,10 @@ class BaseClassSetup(TestCase): self.app_conf = AppConfig() self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter) + @patch('mod.pmsh_config.AppConfig._get_config', MagicMock(return_value=get_pmsh_config())) + def setUpAppConf(self): + self.pmsh_app_conf = NewAppConfig() + def tearDown(self): db.drop_all() diff --git a/components/pm-subscription-handler/tests/test_pmsh_config.py b/components/pm-subscription-handler/tests/test_pmsh_config.py new file mode 100644 index 00000000..deb867bf --- /dev/null +++ b/components/pm-subscription-handler/tests/test_pmsh_config.py @@ -0,0 +1,92 @@ +# ============LICENSE_START=================================================== +# Copyright (C) 2021 Nordix Foundation. +# ============================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END===================================================== +from unittest.mock import Mock, patch + +import responses +from requests import Session + +from mod.pmsh_config import MRTopic, AppConfig +from tests.base_setup import BaseClassSetup + + +class PmshConfigTestCase(BaseClassSetup): + + @classmethod + def setUpClass(cls): + super().setUpClass() + + def setUp(self): + super().setUpAppConf() + self.mock_app = Mock() + + def tearDown(self): + super().tearDown() + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + + def test_config_get_aaf_creds(self): + self.assertEqual(self.pmsh_app_conf.enable_tls, 'true') + self.assertEqual(self.pmsh_app_conf.aaf_id, 'dcae@dcae.onap.org') + self.assertEqual(self.pmsh_app_conf.aaf_pass, 'demo123456!') + + def test_config_get_cert_data(self): + self.assertEqual(self.pmsh_app_conf.key_path, '/opt/app/pmsh/etc/certs/key.pem') + self.assertEqual(self.pmsh_app_conf.cert_path, '/opt/app/pmsh/etc/certs/cert.pem') + self.assertEqual(self.pmsh_app_conf.ca_cert_path, '/opt/app/pmsh/etc/certs/cacert.pem') + + def test_singleton_instance_is_accessible_using_class_method(self): + my_singleton_instance = AppConfig.get_instance() + self.assertIsNotNone(my_singleton_instance) + self.assertIsInstance(my_singleton_instance, AppConfig) + + @patch.object(Session, 'post') + def test_mr_pub_publish_to_topic_success(self, mock_session): + mock_session.return_value.status_code = 200 + with patch('requests.Session.post') as session_post_call: + self.pmsh_app_conf.publish_to_topic(MRTopic.POLICY_PM_PUBLISHER.value, + {"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"}) + session_post_call.assert_called_once() + + @responses.activate + def test_mr_pub_publish_to_topic_fail(self): + responses.add(responses.POST, + 'https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS', + json={"error": "Client Error"}, status=400) + with self.assertRaises(Exception): + self.pmsh_app_conf.publish_to_topic(MRTopic.POLICY_PM_PUBLISHER.value, + {"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"}) + + @responses.activate + def test_mr_sub_get_from_topic_success(self): + responses.add(responses.GET, + 'https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/' + 'dcae_pmsh_cg/1?timeout=5000', + json={"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200) + mr_topic_data = self.pmsh_app_conf.get_from_topic(MRTopic.POLICY_PM_SUBSCRIBER.value, 1) + self.assertIsNotNone(mr_topic_data) + + @responses.activate + def test_mr_sub_get_from_topic_fail(self): + responses.add(responses.GET, + 'https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/' + 'dcae_pmsh_cg/1?timeout=5000', + json={"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400) + with self.assertRaises(Exception): + self.pmsh_app_conf.get_from_topic(MRTopic.POLICY_PM_SUBSCRIBER.value, 1) diff --git a/components/pm-subscription-handler/version.properties b/components/pm-subscription-handler/version.properties index ef20baaf..358e99ce 100644 --- a/components/pm-subscription-handler/version.properties +++ b/components/pm-subscription-handler/version.properties @@ -1,6 +1,6 @@ -major=1 -minor=3 -patch=2 +major=2 +minor=0 +patch=0 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT |