summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service')
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/__init__.py8
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/aai_client.py8
-rw-r--r--components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py158
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py26
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/pmsh_service_main.py4
5 files changed, 177 insertions, 27 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
index 5f78ca19..1024417e 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
@@ -17,8 +17,8 @@
# ============LICENSE_END=====================================================
import logging as logging
import os
-import ssl
import pathlib
+import ssl
from urllib.parse import quote
from connexion import App
@@ -46,9 +46,9 @@ def launch_api_server(app_config):
if app_config.enable_tls:
logger.info('Launching secure http API server')
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- ssl_ctx.load_cert_chain(app_config.cert_params[0], app_config.cert_params[1])
- connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'),
- ssl_options=ssl_ctx, server="tornado")
+ ssl_ctx.load_cert_chain(app_config.cert_path, app_config.key_path)
+ connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), ssl_options=ssl_ctx,
+ server="tornado")
else:
logger.info('Launching unsecure http API server')
connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), server="tornado")
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
index e53ea3a2..39adba46 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2019-2020 Nordix Foundation.
+# Copyright (C) 2019-2021 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -74,7 +74,8 @@ def _get_all_aai_nf_data(app_conf):
app_conf.aaf_creds.get('aaf_pass')),
data=data, params=params,
verify=(app_conf.ca_cert_path if app_conf.enable_tls else False),
- cert=(app_conf.cert_params if app_conf.enable_tls else None))
+ cert=((app_conf.cert_path,
+ app_conf.key_path) if app_conf.enable_tls else None))
response.raise_for_status()
if response.ok:
nf_data = json.loads(response.text)
@@ -177,7 +178,8 @@ def get_aai_model_data(app_conf, model_invariant_id, model_version_id, nf_name):
auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'),
app_conf.aaf_creds.get('aaf_pass')),
verify=(app_conf.ca_cert_path if app_conf.enable_tls else False),
- cert=(app_conf.cert_params if app_conf.enable_tls else None))
+ cert=((app_conf.cert_path,
+ app_conf.key_path) if app_conf.enable_tls else None))
response.raise_for_status()
if response.ok:
data = json.loads(response.text)
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py
new file mode 100644
index 00000000..9c282ab7
--- /dev/null
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py
@@ -0,0 +1,158 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2021 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+
+"""This module represents PMSH application configuration
+ Singleton instance of configuration is created and stored,
+ Enum representation is used for Message Router topics.
+"""
+
+from enum import Enum, unique
+
+import requests
+from onap_dcae_cbs_docker_client.client import get_all
+from requests.auth import HTTPBasicAuth
+from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
+
+from mod import logger
+from mod.pmsh_utils import mdc_handler
+
+
+@unique
+class MRTopic(Enum):
+ """ Enum used to represent Message Router Topic"""
+ AAI_SUBSCRIBER = 'aai_subscriber'
+ POLICY_PM_PUBLISHER = 'policy_pm_publisher'
+ POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber'
+
+
+class MetaSingleton(type):
+ """ Metaclass used to create singleton object by overriding __call__() method """
+ _instances = {}
+
+ def __call__(cls, *args, **kwargs):
+ if cls not in cls._instances:
+ cls._instances[cls] = super().__call__(*args, **kwargs)
+ return cls._instances[cls]
+
+ @classmethod
+ def get_cls_instance(mcs, cls_name):
+ return mcs._instances[cls_name]
+
+
+class AppConfig(metaclass=MetaSingleton):
+ """ Object representation of the PMSH Application config. """
+
+ def __init__(self):
+ app_config = self._get_config()
+ self.key_path = app_config['config'].get('key_path')
+ self.cert_path = app_config['config'].get('cert_path')
+ self.ca_cert_path = app_config['config'].get('ca_cert_path')
+ self.enable_tls = app_config['config'].get('enable_tls')
+ self.aaf_id = app_config['config'].get('aaf_identity')
+ self.aaf_pass = app_config['config'].get('aaf_password')
+ self.streams_publishes = app_config['config'].get('streams_publishes')
+ self.streams_subscribes = app_config['config'].get('streams_subscribes')
+ # TODO: aaf_creds variable should be removed on code cleanup
+ self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass}
+
+ @staticmethod
+ def get_instance():
+ return AppConfig.get_cls_instance(AppConfig)
+
+ @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
+ retry=retry_if_exception_type(ValueError))
+ def _get_config(self):
+
+ """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
+ is received, it retries after 2 seconds for 5 times before raising an exception.
+
+ Returns:
+ dict: Dictionary representation of the the service configuration
+
+ Raises:
+ Exception: If any error occurred pulling configuration from Config binding service.
+ """
+ try:
+ logger.info('Attempting to fetch PMSH Configuration from CBS.')
+ config = get_all()
+ logger.info(f'Successfully fetched PMSH config from CBS: {config}')
+ return config
+ except Exception as e:
+ logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
+ raise ValueError(e)
+
+ @mdc_handler
+ def publish_to_topic(self, mr_topic, event_json, **kwargs):
+ """
+ Publish the event to the DMaaP Message Router topic.
+
+ Args:
+ mr_topic (enum) : Message Router topic to publish.
+ event_json (dict): the json data to be published.
+
+ Raises:
+ Exception: if post request fails.
+ """
+ try:
+ session = requests.Session()
+ topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
+ headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
+ 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
+ logger.info(f'Publishing event to MR topic: {topic_url}')
+ response = session.post(topic_url, headers=headers,
+ auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
+ verify=(self.ca_cert_path if self.enable_tls else False))
+ response.raise_for_status()
+ except Exception as e:
+ raise e
+
+ @mdc_handler
+ def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000,
+ **kwargs):
+ """
+ Returns the json data from the MrTopic.
+
+ Args:
+ mr_topic (enum) : Message Router topic to subscribe.
+ consumer_id (str): Within your subscribers group, a name that uniquely
+ identifies your subscribers process.
+ consumer_group (str): A name that uniquely identifies your subscribers.
+ timeout (int): The request timeout value in mSec.
+
+ Returns:
+ list[str]: the json response from DMaaP Message Router topic.
+ """
+ try:
+ session = requests.Session()
+ topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url')
+ headers = {'accept': 'application/json', 'content-type': 'application/json',
+ 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
+ logger.info(f'Fetching messages from MR topic: {topic_url}')
+ response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}'
+ f'?timeout={timeout}',
+ auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
+ verify=(self.ca_cert_path if self.enable_tls else False))
+ if response.status_code == 503:
+ logger.error(f'MR Service is unavailable at present: {response.content}')
+ pass
+ response.raise_for_status()
+ if response.ok:
+ return response.json()
+ except Exception as e:
+ logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
+ raise
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index 9ddff315..26ada11b 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -18,23 +18,25 @@
import json
import os
import uuid
+from functools import wraps
from json import JSONDecodeError
from os import getenv
from threading import Timer
import requests
+from jsonschema import validate, ValidationError
from onap_dcae_cbs_docker_client.client import get_all
from onaplogging.mdcContext import MDC
from requests.auth import HTTPBasicAuth
from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
-from jsonschema import validate, ValidationError
from mod import logger
from mod.subscription import Subscription
-def mdc_handler(function):
- def decorator(*args, **kwargs):
+def mdc_handler(func):
+ @wraps(func)
+ def wrapper(*args, **kwargs):
request_id = str(uuid.uuid1())
invocation_id = str(uuid.uuid1())
MDC.put('ServiceName', getenv('HOSTNAME'))
@@ -43,23 +45,9 @@ def mdc_handler(function):
kwargs['request_id'] = request_id
kwargs['invocation_id'] = invocation_id
- return function(*args, **kwargs)
-
- return decorator
-
-
-class MySingleton(object):
- instances = {}
+ return func(*args, **kwargs)
- def __new__(cls, clz=None):
- if clz is None:
- if cls.__name__ not in MySingleton.instances:
- MySingleton.instances[cls.__name__] = \
- object.__new__(cls)
- return MySingleton.instances[cls.__name__]
- MySingleton.instances[clz.__name__] = clz()
- MySingleton.first = clz
- return type(clz.__name__, (MySingleton,), dict(clz.__dict__))
+ return wrapper
def _load_sub_schema_from_file():
diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
index 4f2ca4a1..0b6544b5 100755
--- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
+++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
@@ -20,6 +20,7 @@ from signal import signal, SIGTERM
from mod import db, create_app, launch_api_server, logger
from mod.exit_handler import ExitHandler
+from mod.pmsh_config import AppConfig as NewAppConfig
from mod.pmsh_utils import AppConfig, PeriodicTask
from mod.policy_response_handler import PolicyResponseHandler
from mod.subscription_handler import SubscriptionHandler
@@ -32,6 +33,7 @@ def main():
app.app_context().push()
db.create_all(app=app)
app_conf = AppConfig()
+ pmsh_app_conf = NewAppConfig()
policy_mr_pub = app_conf.get_mr_pub('policy_pm_publisher')
policy_mr_sub = app_conf.get_mr_sub('policy_pm_subscriber')
aai_event_mr_sub = app_conf.get_mr_sub('aai_subscriber')
@@ -54,7 +56,7 @@ def main():
signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks,
app_conf=app_conf, subscription_handler=subscription_handler))
- launch_api_server(app_conf)
+ launch_api_server(pmsh_app_conf)
except Exception as e:
logger.error(f'Failed to initialise PMSH: {e}', exc_info=True)