aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/pdp_api/policy_listener.py
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2020-02-27 12:45:54 -0500
committerAlex Shatov <alexs@att.com>2020-02-27 12:45:54 -0500
commit78ff88f9b3a3d32f941b3b9fedc2abfbaba291cb (patch)
tree5670dddc0e0cd9f793d419420b61ad0559639497 /policyhandler/pdp_api/policy_listener.py
parent715fc8a36ac1809cd3e36cbb6cfb7107ebb038ea (diff)
5.1.0 policy-handler - policy-updates from new PDP5.1.0
DCAEGEN2-1851: - policy-handler now supports the policy-update notification from the new policy-engine thru DMaaP MR = no policy-filters - only policy-id values - see README for discoverable config settings of dmaap_mr client = DMaaP MR client has the same flexibility as policy_engine = set the query.timeout to high value like 15000 (default) - requests to DMaaP MR go through a single blocking connection - first catch-up only after draining the policy-updates from DMaaP MR on the first loop - safe parsing of messages from DMaaP MR - policy-engine changed the data type for policy-version field from int to string that is expected to have the semver value - related change to deployment-handler (DCAEGEN2-2085) has to be deployed to handle the non-numeric policyVersion - on new PDP API: http /policy_latest and policy-updates return the new data from the new PDP API with the following fields added/renamed by the policy-handler to keep other policy related parts intact in R4-R6 (see pdp_api/policy_utils.py) * policyName = policy_id + "." + policyVersion.replace(".","-") + ".xml" * policyVersion = str(metadata["policy-version"]) * "config" - is the renamed "properties" from the new PDP API response - enabled the /catch_up and the periodic auto-catch-up for the new PDP API - enabled GET /policies_latest - returns the latest policies for the deployed components - POST /policies_latest - still disabled since no support for the policy-filters is provided for the new PDP API - fixed hiding the Authorization value on comparing the configs - logging of secrets is now sha256 to see whether they changed - added X-ONAP-RequestID to headers the same way as X-ECOMP-RequestID - on policy-update process the removal first, then addition - changed the pool_connections=1 (number of pools) on PDP and DH sides == only a single destination is expected for each - log the exception as fatal into error.log - other minor fixes and refactoring - unit-test coverage 74% - integration testing is requested DCAEGEN2-1976: - policy-handler is enhanced to get user/password from env vars for PDP and DMaaP MR clients and overwriting the Authorization field in https headers received from the discoverable config = to override the Authorization value on policy_engine, set the environment vars $PDP_USER and $PDP_PWD in policy-handler container = to override the Authorization value on dmaap_mr, if using https and user-password authentication, set the environment vars $DMAAP_MR_USER and $DMAAP_MR_PWD in policy-handler container Change-Id: Iad8eab9e20e615a0e0d2822f4735dc64c50aa55c Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-1851 Issue-ID: DCAEGEN2-1976
Diffstat (limited to 'policyhandler/pdp_api/policy_listener.py')
-rw-r--r--policyhandler/pdp_api/policy_listener.py176
1 files changed, 155 insertions, 21 deletions
diff --git a/policyhandler/pdp_api/policy_listener.py b/policyhandler/pdp_api/policy_listener.py
index 9fa4695..0d33785 100644
--- a/policyhandler/pdp_api/policy_listener.py
+++ b/policyhandler/pdp_api/policy_listener.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,39 +17,173 @@
"""
policy-listener communicates with policy-engine
-to receive push notifications
+to receive push notifications through DMaaP MR
on updates and removal of policies.
-on receiving the policy-notifications, the policy-receiver
+on receiving the policy-notifications, the policy-listener
passes the notifications to policy-updater
"""
+import json
import os
+from threading import Event, Lock, Thread
-from ..utils import ToBeImplementedException, Utils
+from ..onap.audit import Audit, AuditResponseCode
+from ..utils import Utils
+from .dmaap_mr import DmaapMr
+from .pdp_consts import (DEPLOYED_POLICIES, PDP_METADATA, PDP_POLICY_ID,
+ PDP_POLICY_VERSION, UNDEPLOYED_POLICIES)
_LOGGER = Utils.get_logger(__file__)
-class PolicyListener(object):
- """listener to PolicyEngine"""
+class PolicyListener(Thread):
+ """listener to DMaaP MR"""
PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+ SLEEP_BEFORE_RESTARTING = 30
- def __init__(self, *_):
+ def __init__(self, audit, policy_updater):
"""listener to receive the policy notifications from PolicyEngine"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ Thread.__init__(self, name="policy_listener", daemon=True)
- def reconfigure(self, _):
- """configure and reconfigure the listener"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ self._policy_updater = policy_updater
+ self._lock = Lock()
+ self._run_event = Event()
+ self._keep_running = True
+ self._first_loop = True
+
+ self._dmaap_mr = None
+ self.reconfigure(audit)
+
+ def reconfigure(self, audit):
+ """configure and reconfigure the DMaaP MR"""
+ reconfigured = DmaapMr.reconfigure(audit)
+ if reconfigured and not self._first_loop:
+ with self._lock:
+ self._first_loop = True
+ return reconfigured
def run(self):
- """listen on web-socket and pass the policy notifications to policy-updater"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
-
- def shutdown(self, _):
- """Shutdown the policy-listener"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ """listen on DMaaP MR and pass the policy notifications to policy-updater"""
+ _LOGGER.info("starting policy_listener...")
+ delayed_restarting = False
+ while True:
+ if not self._get_keep_running():
+ break
+
+ if delayed_restarting:
+ _LOGGER.info(
+ "going to sleep for %s secs before restarting policy-notifications",
+ PolicyListener.SLEEP_BEFORE_RESTARTING)
+
+ self._run_event.clear()
+ self._run_event.wait(PolicyListener.SLEEP_BEFORE_RESTARTING)
+ if not self._get_keep_running():
+ break
+
+ audit = Audit(job_name="policy_update",
+ req_message="waiting for policy-notifications...",
+ retry_get_config=True)
+
+ policy_updates = DmaapMr.get_policy_updates(audit)
+
+ if not self._get_keep_running():
+ audit.audit_done(result="exiting policy_listener")
+ break
+
+ delayed_restarting = not audit.is_success()
+ if self._first_loop:
+ policy_updater = None
+ with self._lock:
+ if self._first_loop:
+ self._first_loop = False
+ policy_updater = self._policy_updater
+ if policy_updater is not None:
+ audit.req_message = "first catch_up"
+ _LOGGER.info(audit.info("first catch_up - ignoring policy-updates: {}"
+ .format(json.dumps(policy_updates))))
+ policy_updater.catch_up(audit)
+ elif not policy_updates:
+ _LOGGER.info(audit.info(
+ "no policy-updates: {}".format(json.dumps(policy_updates))))
+ audit.audit_done(result="no policy-updates")
+ else:
+ self._on_policy_update_message(audit, policy_updates)
+
+ _LOGGER.info("exit policy_listener")
+
+ def _get_keep_running(self):
+ """thread-safe check whether to continue running"""
+ with self._lock:
+ keep_running = self._keep_running
+ return keep_running
+
+ def _on_policy_update_message(self, audit, policy_updates):
+ """received the notification from PDP"""
+ try:
+ _LOGGER.info("Received notification message: %s", json.dumps(policy_updates))
+ if not policy_updates:
+ return
+
+ policies_updated = []
+
+ for idx, pdp_update_msg in enumerate(policy_updates):
+ pdp_update_msg = Utils.safe_json_parse(pdp_update_msg)
+
+ if not pdp_update_msg or not isinstance(pdp_update_msg, dict):
+ _LOGGER.warning(audit.warn(
+ "unexpected message from PDP: {}".format(json.dumps(pdp_update_msg)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ continue
+
+ _LOGGER.debug("raw policy_update[%s]: %s", idx, json.dumps(pdp_update_msg))
+
+ deployed_policies = [
+ {PDP_METADATA: {PDP_POLICY_ID: p_deployed.get(PDP_POLICY_ID),
+ PDP_POLICY_VERSION: p_deployed.get(PDP_POLICY_VERSION)}}
+ for p_deployed in pdp_update_msg.get(DEPLOYED_POLICIES, [])
+ if (p_deployed.get(PDP_POLICY_ID) is not None
+ and p_deployed.get(PDP_POLICY_VERSION) is not None)]
+
+ undeployed_policies = [
+ {PDP_METADATA: {PDP_POLICY_ID: p_undeployed.get(PDP_POLICY_ID),
+ PDP_POLICY_VERSION: p_undeployed.get(PDP_POLICY_VERSION)}}
+ for p_undeployed in pdp_update_msg.get(UNDEPLOYED_POLICIES, [])
+ if (p_undeployed.get(PDP_POLICY_ID) is not None
+ and p_undeployed.get(PDP_POLICY_VERSION) is not None)]
+
+ if not deployed_policies and not undeployed_policies:
+ _LOGGER.warning(audit.warn(
+ "no policy deployed or undeployed: {}".format(json.dumps(pdp_update_msg)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ continue
+
+ policy_update = {DEPLOYED_POLICIES: deployed_policies,
+ UNDEPLOYED_POLICIES: undeployed_policies}
+ _LOGGER.info(audit.info("policy_update[{}]: {}"
+ .format(idx, json.dumps(policy_update))))
+
+ policies_updated.append(policy_update)
+
+ if not policies_updated:
+ _LOGGER.warning(audit.warn(
+ "erroneous notification from PDP: {}".format(json.dumps(policy_updates)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ return
+
+ with self._lock:
+ policy_updater = self._policy_updater
+ if policy_updater is not None:
+ policy_updater.policy_update(audit, policies_updated)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_policy_update_message",
+ json.dumps(policy_updates))
+ _LOGGER.exception(audit.fatal(error_msg))
+
+ def shutdown(self, audit):
+ """Shutdown the policy_listener"""
+ _LOGGER.info(audit.info("shutdown policy_listener - no waiting..."))
+ with self._lock:
+ self._keep_running = False
+ self._policy_updater = None
+ self._run_event.set()