summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py
blob: 9c282ab744a4e9ee43255c74503996a80827bff7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# ============LICENSE_START===================================================
#  Copyright (C) 2021 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================

"""This module represents PMSH application configuration
   Singleton instance of configuration is created and stored,
   Enum representation is used for Message Router topics.
"""

from enum import Enum, unique

import requests
from onap_dcae_cbs_docker_client.client import get_all
from requests.auth import HTTPBasicAuth
from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type

from mod import logger
from mod.pmsh_utils import mdc_handler


@unique
class MRTopic(Enum):
    """ Enum used to represent Message Router Topic"""
    AAI_SUBSCRIBER = 'aai_subscriber'
    POLICY_PM_PUBLISHER = 'policy_pm_publisher'
    POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber'


class MetaSingleton(type):
    """ Metaclass used to create singleton object by overriding __call__() method """
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

    @classmethod
    def get_cls_instance(mcs, cls_name):
        return mcs._instances[cls_name]


class AppConfig(metaclass=MetaSingleton):
    """ Object representation of the PMSH Application config. """

    def __init__(self):
        app_config = self._get_config()
        self.key_path = app_config['config'].get('key_path')
        self.cert_path = app_config['config'].get('cert_path')
        self.ca_cert_path = app_config['config'].get('ca_cert_path')
        self.enable_tls = app_config['config'].get('enable_tls')
        self.aaf_id = app_config['config'].get('aaf_identity')
        self.aaf_pass = app_config['config'].get('aaf_password')
        self.streams_publishes = app_config['config'].get('streams_publishes')
        self.streams_subscribes = app_config['config'].get('streams_subscribes')
        # TODO: aaf_creds variable should be removed on code cleanup
        self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass}

    @staticmethod
    def get_instance():
        return AppConfig.get_cls_instance(AppConfig)

    @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
           retry=retry_if_exception_type(ValueError))
    def _get_config(self):

        """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
        is received, it retries after 2 seconds for 5 times before raising an exception.

        Returns:
            dict: Dictionary representation of the the service configuration

        Raises:
            Exception: If any error occurred pulling configuration from Config binding service.
        """
        try:
            logger.info('Attempting to fetch PMSH Configuration from CBS.')
            config = get_all()
            logger.info(f'Successfully fetched PMSH config from CBS: {config}')
            return config
        except Exception as e:
            logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
            raise ValueError(e)

    @mdc_handler
    def publish_to_topic(self, mr_topic, event_json, **kwargs):
        """
        Publish the event to the DMaaP Message Router topic.

        Args:
            mr_topic (enum) : Message Router topic to publish.
            event_json (dict): the json data to be published.

        Raises:
            Exception: if post request fails.
        """
        try:
            session = requests.Session()
            topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
            headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
                       'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
            logger.info(f'Publishing event to MR topic: {topic_url}')
            response = session.post(topic_url, headers=headers,
                                    auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
                                    verify=(self.ca_cert_path if self.enable_tls else False))
            response.raise_for_status()
        except Exception as e:
            raise e

    @mdc_handler
    def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000,
                       **kwargs):
        """
        Returns the json data from the MrTopic.

        Args:
            mr_topic (enum) : Message Router topic to subscribe.
            consumer_id (str): Within your subscribers group, a name that uniquely
            identifies your subscribers process.
            consumer_group (str): A name that uniquely identifies your subscribers.
            timeout (int): The request timeout value in mSec.

        Returns:
            list[str]: the json response from DMaaP Message Router topic.
        """
        try:
            session = requests.Session()
            topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url')
            headers = {'accept': 'application/json', 'content-type': 'application/json',
                       'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
            logger.info(f'Fetching messages from MR topic: {topic_url}')
            response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}'
                                   f'?timeout={timeout}',
                                   auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
                                   verify=(self.ca_cert_path if self.enable_tls else False))
            if response.status_code == 503:
                logger.error(f'MR Service is unavailable at present: {response.content}')
                pass
            response.raise_for_status()
            if response.ok:
                return response.json()
        except Exception as e:
            logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
            raise