1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# ============LICENSE_START===================================================
# Copyright (C) 2021-2022 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
"""This module represents PMSH application configuration
Singleton instance of configuration is created and stored,
Enum representation is used for Message Router topics.
"""
from enum import Enum, unique
import requests
from onap_dcae_cbs_docker_client.client import get_all
from requests.auth import HTTPBasicAuth
from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
from mod import logger, mdc_handler
@unique
class MRTopic(Enum):
""" Enum used to represent Message Router Topic"""
AAI_SUBSCRIBER = 'aai_subscriber'
POLICY_PM_PUBLISHER = 'policy_pm_publisher'
POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber'
class MetaSingleton(type):
""" Metaclass used to create singleton object by overriding __call__() method """
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
@classmethod
def get_cls_instance(mcs, cls_name):
return mcs._instances[cls_name]
class AppConfig(metaclass=MetaSingleton):
""" Object representation of the PMSH Application config. """
def __init__(self):
app_config = self._get_config()
self.key_path = app_config['config'].get('key_path')
self.cert_path = app_config['config'].get('cert_path')
self.ca_cert_path = app_config['config'].get('ca_cert_path')
self.enable_tls = app_config['config'].get('enable_tls')
self.aaf_id = app_config['config'].get('aaf_identity')
self.aaf_pass = app_config['config'].get('aaf_password')
self.streams_publishes = app_config['config'].get('streams_publishes')
self.streams_subscribes = app_config['config'].get('streams_subscribes')
@staticmethod
def get_instance():
return AppConfig.get_cls_instance(AppConfig)
@retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
retry=retry_if_exception_type(ValueError))
def _get_config(self):
""" Retrieves PMSH's configuration from Config binding service. If a non-2xx response
is received, it retries after 2 seconds for 5 times before raising an exception.
Returns:
dict: Dictionary representation of the the service configuration
Raises:
Exception: If any error occurred pulling configuration from Config binding service.
"""
try:
logger.info('Attempting to fetch PMSH Configuration from CBS.')
config = get_all()
logger.info(f'Successfully fetched PMSH config from CBS: {config}')
return config
except Exception as e:
logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
raise ValueError(e) from e
@mdc_handler
def publish_to_topic(self, mr_topic, event_json, **kwargs):
"""
Publish the event to the DMaaP Message Router topic.
Args:
mr_topic (enum) : Message Router topic to publish.
event_json (dict): the json data to be published.
"""
session = requests.Session()
topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
logger.info(f'Publishing event to MR topic: {topic_url}')
response = session.post(topic_url, headers=headers,
auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
verify=(self.ca_cert_path if self.enable_tls else False))
response.raise_for_status()
@mdc_handler
def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000,
**kwargs):
"""
Returns the json data from the MrTopic.
Args:
mr_topic (enum) : Message Router topic to subscribe.
consumer_id (str): Within your subscribers group, a name that uniquely
identifies your subscribers process.
consumer_group (str): A name that uniquely identifies your subscribers.
timeout (int): The request timeout value in mSec.
Returns:
list[str]: the json response from DMaaP Message Router topic.
"""
try:
session = requests.Session()
topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url')
headers = {'accept': 'application/json', 'content-type': 'application/json',
'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
logger.info(f'Fetching messages from MR topic: {topic_url}')
response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}'
f'?timeout={timeout}',
auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
verify=(self.ca_cert_path if self.enable_tls else False))
if response.status_code == 503:
logger.error(f'MR Service is unavailable at present: {response.content}')
response.raise_for_status()
if response.ok:
return response.json()
except Exception as e:
logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
raise
|