diff options
Diffstat (limited to 'components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py')
-rwxr-xr-x | components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py | 46 |
1 files changed, 35 insertions, 11 deletions
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py index 8db3c1f8..01661ad0 100755 --- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py +++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py @@ -15,23 +15,39 @@ # # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END===================================================== - import uuid +from os import getenv from threading import Timer import requests from onap_dcae_cbs_docker_client.client import get_all +from onaplogging.mdcContext import MDC from requests.auth import HTTPBasicAuth from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type -import mod.pmsh_logging as logger +from mod import logger + + +def mdc_handler(function): + def decorator(*args, **kwargs): + request_id = str(uuid.uuid1()) + invocation_id = str(uuid.uuid1()) + MDC.put('ServiceName', getenv('HOSTNAME')) + MDC.put('RequestID', request_id) + MDC.put('InvocationID', invocation_id) + + kwargs['request_id'] = request_id + kwargs['invocation_id'] = invocation_id + return function(*args, **kwargs) + return decorator class ConfigHandler: """ Handles retrieval of PMSH's configuration from Configbinding service.""" @staticmethod + @mdc_handler @retry(wait=wait_fixed(2), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) - def get_pmsh_config(): + def get_pmsh_config(**kwargs): """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response is received, it retries after 2 seconds for 5 times before raising an exception. @@ -42,11 +58,12 @@ class ConfigHandler: Exception: If any error occurred pulling configuration from Config binding service. """ try: + logger.info('Fetching PMSH Configuration from CBS.') config = get_all() - logger.debug(f'PMSH config from CBS: {config}') + logger.info(f'Successfully fetched PMSH config from CBS: {config}') return config except Exception as err: - logger.debug(f'Failed to get config from CBS: {err}') + logger.error(f'Failed to get config from CBS: {err}') raise Exception @@ -129,7 +146,8 @@ class _MrPub(_DmaapMrClient): self.pub_name = pub_name super().__init__(aaf_creds, **kwargs) - def publish_to_topic(self, event_json): + @mdc_handler + def publish_to_topic(self, event_json, **kwargs): """ Publish the event to the DMaaP Message Router topic. @@ -141,7 +159,10 @@ class _MrPub(_DmaapMrClient): """ try: session = requests.Session() - headers = {'content-type': 'application/json', 'x-transactionId': str(uuid.uuid1())} + headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'], + 'InvocationID': kwargs['invocation_id'], + 'RequestID': kwargs['request_id'] + } response = session.post(self.topic_url, headers=headers, auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json, verify=False) @@ -171,7 +192,8 @@ class _MrSub(_DmaapMrClient): self.sub_name = sub_name super().__init__(aaf_creds, **kwargs) - def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000): + @mdc_handler + def get_from_topic(self, consumer_id, consumer_group='dcae_pmsh_cg', timeout=1000, **kwargs): """ Returns the json data from the MrTopic. @@ -187,8 +209,10 @@ class _MrSub(_DmaapMrClient): topic_data = None try: session = requests.Session() - headers = {'accept': 'application/json', 'content-type': 'application/json'} - logger.debug(f'Request sent to MR topic: {self.topic_url}') + headers = {'accept': 'application/json', 'content-type': 'application/json', + 'InvocationID': kwargs['invocation_id'], + 'RequestID': kwargs['request_id']} + logger.debug(f'Fetching messages from MR topic: {self.topic_url}') response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}' f'?timeout={timeout}', auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers, @@ -197,7 +221,7 @@ class _MrSub(_DmaapMrClient): if response.ok: topic_data = response.json() except Exception as e: - logger.debug(e) + logger.error(f'Failed to fetch message from MR: {e}') return topic_data |