summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py')
-rwxr-xr-xcomponents/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py46
1 files changed, 35 insertions, 11 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index 8db3c1f8..01661ad0 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -15,23 +15,39 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
-
import uuid
+from os import getenv
from threading import Timer
import requests
from onap_dcae_cbs_docker_client.client import get_all
+from onaplogging.mdcContext import MDC
from requests.auth import HTTPBasicAuth
from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
-import mod.pmsh_logging as logger
+from mod import logger
+
+
+def mdc_handler(function):
+ def decorator(*args, **kwargs):
+ request_id = str(uuid.uuid1())
+ invocation_id = str(uuid.uuid1())
+ MDC.put('ServiceName', getenv('HOSTNAME'))
+ MDC.put('RequestID', request_id)
+ MDC.put('InvocationID', invocation_id)
+
+ kwargs['request_id'] = request_id
+ kwargs['invocation_id'] = invocation_id
+ return function(*args, **kwargs)
+ return decorator
class ConfigHandler:
""" Handles retrieval of PMSH's configuration from Configbinding service."""
@staticmethod
+ @mdc_handler
@retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
- def get_pmsh_config():
+ def get_pmsh_config(**kwargs):
""" Retrieves PMSH's configuration from Config binding service. If a non-2xx response
is received, it retries after 2 seconds for 5 times before raising an exception.
@@ -42,11 +58,12 @@ class ConfigHandler:
Exception: If any error occurred pulling configuration from Config binding service.
"""
try:
+ logger.info('Fetching PMSH Configuration from CBS.')
config = get_all()
- logger.debug(f'PMSH config from CBS: {config}')
+ logger.info(f'Successfully fetched PMSH config from CBS: {config}')
return config
except Exception as err:
- logger.debug(f'Failed to get config from CBS: {err}')
+ logger.error(f'Failed to get config from CBS: {err}')
raise Exception
@@ -129,7 +146,8 @@ class _MrPub(_DmaapMrClient):
self.pub_name = pub_name
super().__init__(aaf_creds, **kwargs)
- def publish_to_topic(self, event_json):
+ @mdc_handler
+ def publish_to_topic(self, event_json, **kwargs):
"""
Publish the event to the DMaaP Message Router topic.
@@ -141,7 +159,10 @@ class _MrPub(_DmaapMrClient):
"""
try:
session = requests.Session()
- headers = {'content-type': 'application/json', 'x-transactionId': str(uuid.uuid1())}
+ headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
+ 'InvocationID': kwargs['invocation_id'],
+ 'RequestID': kwargs['request_id']
+ }
response = session.post(self.topic_url, headers=headers,
auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
verify=False)
@@ -171,7 +192,8 @@ class _MrSub(_DmaapMrClient):
self.sub_name = sub_name
super().__init__(aaf_creds, **kwargs)
- def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000):
+ @mdc_handler
+ def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs):
"""
Returns the json data from the MrTopic.
@@ -187,8 +209,10 @@ class _MrSub(_DmaapMrClient):
topic_data = None
try:
session = requests.Session()
- headers = {'accept': 'application/json', 'content-type': 'application/json'}
- logger.debug(f'Request sent to MR topic: {self.topic_url}')
+ headers = {'accept': 'application/json', 'content-type': 'application/json',
+ 'InvocationID': kwargs['invocation_id'],
+ 'RequestID': kwargs['request_id']}
+ logger.debug(f'Fetching messages from MR topic: {self.topic_url}')
response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
f'?timeout={timeout}',
auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
@@ -197,7 +221,7 @@ class _MrSub(_DmaapMrClient):
if response.ok:
topic_data = response.json()
except Exception as e:
- logger.debug(e)
+ logger.error(f'Failed to fetch message from MR: {e}')
return topic_data