From 78ff88f9b3a3d32f941b3b9fedc2abfbaba291cb Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Thu, 27 Feb 2020 12:45:54 -0500 Subject: 5.1.0 policy-handler - policy-updates from new PDP DCAEGEN2-1851: - policy-handler now supports the policy-update notification from the new policy-engine thru DMaaP MR = no policy-filters - only policy-id values - see README for discoverable config settings of dmaap_mr client = DMaaP MR client has the same flexibility as policy_engine = set the query.timeout to high value like 15000 (default) - requests to DMaaP MR go through a single blocking connection - first catch-up only after draining the policy-updates from DMaaP MR on the first loop - safe parsing of messages from DMaaP MR - policy-engine changed the data type for policy-version field from int to string that is expected to have the semver value - related change to deployment-handler (DCAEGEN2-2085) has to be deployed to handle the non-numeric policyVersion - on new PDP API: http /policy_latest and policy-updates return the new data from the new PDP API with the following fields added/renamed by the policy-handler to keep other policy related parts intact in R4-R6 (see pdp_api/policy_utils.py) * policyName = policy_id + "." + policyVersion.replace(".","-") + ".xml" * policyVersion = str(metadata["policy-version"]) * "config" - is the renamed "properties" from the new PDP API response - enabled the /catch_up and the periodic auto-catch-up for the new PDP API - enabled GET /policies_latest - returns the latest policies for the deployed components - POST /policies_latest - still disabled since no support for the policy-filters is provided for the new PDP API - fixed hiding the Authorization value on comparing the configs - logging of secrets is now sha256 to see whether they changed - added X-ONAP-RequestID to headers the same way as X-ECOMP-RequestID - on policy-update process the removal first, then addition - changed the pool_connections=1 (number of pools) on PDP and DH sides == only a single destination is expected for each - log the exception as fatal into error.log - other minor fixes and refactoring - unit-test coverage 74% - integration testing is requested DCAEGEN2-1976: - policy-handler is enhanced to get user/password from env vars for PDP and DMaaP MR clients and overwriting the Authorization field in https headers received from the discoverable config = to override the Authorization value on policy_engine, set the environment vars $PDP_USER and $PDP_PWD in policy-handler container = to override the Authorization value on dmaap_mr, if using https and user-password authentication, set the environment vars $DMAAP_MR_USER and $DMAAP_MR_PWD in policy-handler container Change-Id: Iad8eab9e20e615a0e0d2822f4735dc64c50aa55c Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-1851 Issue-ID: DCAEGEN2-1976 --- policyhandler/pdp_api/policy_listener.py | 176 +++++++++++++++++++++++++++---- 1 file changed, 155 insertions(+), 21 deletions(-) (limited to 'policyhandler/pdp_api/policy_listener.py') diff --git a/policyhandler/pdp_api/policy_listener.py b/policyhandler/pdp_api/policy_listener.py index 9fa4695..0d33785 100644 --- a/policyhandler/pdp_api/policy_listener.py +++ b/policyhandler/pdp_api/policy_listener.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,39 +17,173 @@ """ policy-listener communicates with policy-engine -to receive push notifications +to receive push notifications through DMaaP MR on updates and removal of policies. -on receiving the policy-notifications, the policy-receiver +on receiving the policy-notifications, the policy-listener passes the notifications to policy-updater """ +import json import os +from threading import Event, Lock, Thread -from ..utils import ToBeImplementedException, Utils +from ..onap.audit import Audit, AuditResponseCode +from ..utils import Utils +from .dmaap_mr import DmaapMr +from .pdp_consts import (DEPLOYED_POLICIES, PDP_METADATA, PDP_POLICY_ID, + PDP_POLICY_VERSION, UNDEPLOYED_POLICIES) _LOGGER = Utils.get_logger(__file__) -class PolicyListener(object): - """listener to PolicyEngine""" +class PolicyListener(Thread): + """listener to DMaaP MR""" PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) + SLEEP_BEFORE_RESTARTING = 30 - def __init__(self, *_): + def __init__(self, audit, policy_updater): """listener to receive the policy notifications from PolicyEngine""" - _LOGGER.info("to_be_implemented") - raise ToBeImplementedException() + Thread.__init__(self, name="policy_listener", daemon=True) - def reconfigure(self, _): - """configure and reconfigure the listener""" - _LOGGER.info("to_be_implemented") - raise ToBeImplementedException() + self._policy_updater = policy_updater + self._lock = Lock() + self._run_event = Event() + self._keep_running = True + self._first_loop = True + + self._dmaap_mr = None + self.reconfigure(audit) + + def reconfigure(self, audit): + """configure and reconfigure the DMaaP MR""" + reconfigured = DmaapMr.reconfigure(audit) + if reconfigured and not self._first_loop: + with self._lock: + self._first_loop = True + return reconfigured def run(self): - """listen on web-socket and pass the policy notifications to policy-updater""" - _LOGGER.info("to_be_implemented") - raise ToBeImplementedException() - - def shutdown(self, _): - """Shutdown the policy-listener""" - _LOGGER.info("to_be_implemented") - raise ToBeImplementedException() + """listen on DMaaP MR and pass the policy notifications to policy-updater""" + _LOGGER.info("starting policy_listener...") + delayed_restarting = False + while True: + if not self._get_keep_running(): + break + + if delayed_restarting: + _LOGGER.info( + "going to sleep for %s secs before restarting policy-notifications", + PolicyListener.SLEEP_BEFORE_RESTARTING) + + self._run_event.clear() + self._run_event.wait(PolicyListener.SLEEP_BEFORE_RESTARTING) + if not self._get_keep_running(): + break + + audit = Audit(job_name="policy_update", + req_message="waiting for policy-notifications...", + retry_get_config=True) + + policy_updates = DmaapMr.get_policy_updates(audit) + + if not self._get_keep_running(): + audit.audit_done(result="exiting policy_listener") + break + + delayed_restarting = not audit.is_success() + if self._first_loop: + policy_updater = None + with self._lock: + if self._first_loop: + self._first_loop = False + policy_updater = self._policy_updater + if policy_updater is not None: + audit.req_message = "first catch_up" + _LOGGER.info(audit.info("first catch_up - ignoring policy-updates: {}" + .format(json.dumps(policy_updates)))) + policy_updater.catch_up(audit) + elif not policy_updates: + _LOGGER.info(audit.info( + "no policy-updates: {}".format(json.dumps(policy_updates)))) + audit.audit_done(result="no policy-updates") + else: + self._on_policy_update_message(audit, policy_updates) + + _LOGGER.info("exit policy_listener") + + def _get_keep_running(self): + """thread-safe check whether to continue running""" + with self._lock: + keep_running = self._keep_running + return keep_running + + def _on_policy_update_message(self, audit, policy_updates): + """received the notification from PDP""" + try: + _LOGGER.info("Received notification message: %s", json.dumps(policy_updates)) + if not policy_updates: + return + + policies_updated = [] + + for idx, pdp_update_msg in enumerate(policy_updates): + pdp_update_msg = Utils.safe_json_parse(pdp_update_msg) + + if not pdp_update_msg or not isinstance(pdp_update_msg, dict): + _LOGGER.warning(audit.warn( + "unexpected message from PDP: {}".format(json.dumps(pdp_update_msg)), + error_code=AuditResponseCode.DATA_ERROR)) + continue + + _LOGGER.debug("raw policy_update[%s]: %s", idx, json.dumps(pdp_update_msg)) + + deployed_policies = [ + {PDP_METADATA: {PDP_POLICY_ID: p_deployed.get(PDP_POLICY_ID), + PDP_POLICY_VERSION: p_deployed.get(PDP_POLICY_VERSION)}} + for p_deployed in pdp_update_msg.get(DEPLOYED_POLICIES, []) + if (p_deployed.get(PDP_POLICY_ID) is not None + and p_deployed.get(PDP_POLICY_VERSION) is not None)] + + undeployed_policies = [ + {PDP_METADATA: {PDP_POLICY_ID: p_undeployed.get(PDP_POLICY_ID), + PDP_POLICY_VERSION: p_undeployed.get(PDP_POLICY_VERSION)}} + for p_undeployed in pdp_update_msg.get(UNDEPLOYED_POLICIES, []) + if (p_undeployed.get(PDP_POLICY_ID) is not None + and p_undeployed.get(PDP_POLICY_VERSION) is not None)] + + if not deployed_policies and not undeployed_policies: + _LOGGER.warning(audit.warn( + "no policy deployed or undeployed: {}".format(json.dumps(pdp_update_msg)), + error_code=AuditResponseCode.DATA_ERROR)) + continue + + policy_update = {DEPLOYED_POLICIES: deployed_policies, + UNDEPLOYED_POLICIES: undeployed_policies} + _LOGGER.info(audit.info("policy_update[{}]: {}" + .format(idx, json.dumps(policy_update)))) + + policies_updated.append(policy_update) + + if not policies_updated: + _LOGGER.warning(audit.warn( + "erroneous notification from PDP: {}".format(json.dumps(policy_updates)), + error_code=AuditResponseCode.DATA_ERROR)) + return + + with self._lock: + policy_updater = self._policy_updater + if policy_updater is not None: + policy_updater.policy_update(audit, policies_updated) + except Exception as ex: + error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), + "on_policy_update_message", + json.dumps(policy_updates)) + _LOGGER.exception(audit.fatal(error_msg)) + + def shutdown(self, audit): + """Shutdown the policy_listener""" + _LOGGER.info(audit.info("shutdown policy_listener - no waiting...")) + with self._lock: + self._keep_running = False + self._policy_updater = None + self._run_event.set() -- cgit 1.2.3-korg