aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/pdp_api/policy_listener.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/pdp_api/policy_listener.py')
-rw-r--r--policyhandler/pdp_api/policy_listener.py176
1 files changed, 155 insertions, 21 deletions
diff --git a/policyhandler/pdp_api/policy_listener.py b/policyhandler/pdp_api/policy_listener.py
index 9fa4695..0d33785 100644
--- a/policyhandler/pdp_api/policy_listener.py
+++ b/policyhandler/pdp_api/policy_listener.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,39 +17,173 @@
"""
policy-listener communicates with policy-engine
-to receive push notifications
+to receive push notifications through DMaaP MR
on updates and removal of policies.
-on receiving the policy-notifications, the policy-receiver
+on receiving the policy-notifications, the policy-listener
passes the notifications to policy-updater
"""
+import json
import os
+from threading import Event, Lock, Thread
-from ..utils import ToBeImplementedException, Utils
+from ..onap.audit import Audit, AuditResponseCode
+from ..utils import Utils
+from .dmaap_mr import DmaapMr
+from .pdp_consts import (DEPLOYED_POLICIES, PDP_METADATA, PDP_POLICY_ID,
+ PDP_POLICY_VERSION, UNDEPLOYED_POLICIES)
_LOGGER = Utils.get_logger(__file__)
-class PolicyListener(object):
- """listener to PolicyEngine"""
+class PolicyListener(Thread):
+ """listener to DMaaP MR"""
PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+ SLEEP_BEFORE_RESTARTING = 30
- def __init__(self, *_):
+ def __init__(self, audit, policy_updater):
"""listener to receive the policy notifications from PolicyEngine"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ Thread.__init__(self, name="policy_listener", daemon=True)
- def reconfigure(self, _):
- """configure and reconfigure the listener"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ self._policy_updater = policy_updater
+ self._lock = Lock()
+ self._run_event = Event()
+ self._keep_running = True
+ self._first_loop = True
+
+ self._dmaap_mr = None
+ self.reconfigure(audit)
+
+ def reconfigure(self, audit):
+ """configure and reconfigure the DMaaP MR"""
+ reconfigured = DmaapMr.reconfigure(audit)
+ if reconfigured and not self._first_loop:
+ with self._lock:
+ self._first_loop = True
+ return reconfigured
def run(self):
- """listen on web-socket and pass the policy notifications to policy-updater"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
-
- def shutdown(self, _):
- """Shutdown the policy-listener"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ """listen on DMaaP MR and pass the policy notifications to policy-updater"""
+ _LOGGER.info("starting policy_listener...")
+ delayed_restarting = False
+ while True:
+ if not self._get_keep_running():
+ break
+
+ if delayed_restarting:
+ _LOGGER.info(
+ "going to sleep for %s secs before restarting policy-notifications",
+ PolicyListener.SLEEP_BEFORE_RESTARTING)
+
+ self._run_event.clear()
+ self._run_event.wait(PolicyListener.SLEEP_BEFORE_RESTARTING)
+ if not self._get_keep_running():
+ break
+
+ audit = Audit(job_name="policy_update",
+ req_message="waiting for policy-notifications...",
+ retry_get_config=True)
+
+ policy_updates = DmaapMr.get_policy_updates(audit)
+
+ if not self._get_keep_running():
+ audit.audit_done(result="exiting policy_listener")
+ break
+
+ delayed_restarting = not audit.is_success()
+ if self._first_loop:
+ policy_updater = None
+ with self._lock:
+ if self._first_loop:
+ self._first_loop = False
+ policy_updater = self._policy_updater
+ if policy_updater is not None:
+ audit.req_message = "first catch_up"
+ _LOGGER.info(audit.info("first catch_up - ignoring policy-updates: {}"
+ .format(json.dumps(policy_updates))))
+ policy_updater.catch_up(audit)
+ elif not policy_updates:
+ _LOGGER.info(audit.info(
+ "no policy-updates: {}".format(json.dumps(policy_updates))))
+ audit.audit_done(result="no policy-updates")
+ else:
+ self._on_policy_update_message(audit, policy_updates)
+
+ _LOGGER.info("exit policy_listener")
+
+ def _get_keep_running(self):
+ """thread-safe check whether to continue running"""
+ with self._lock:
+ keep_running = self._keep_running
+ return keep_running
+
+ def _on_policy_update_message(self, audit, policy_updates):
+ """received the notification from PDP"""
+ try:
+ _LOGGER.info("Received notification message: %s", json.dumps(policy_updates))
+ if not policy_updates:
+ return
+
+ policies_updated = []
+
+ for idx, pdp_update_msg in enumerate(policy_updates):
+ pdp_update_msg = Utils.safe_json_parse(pdp_update_msg)
+
+ if not pdp_update_msg or not isinstance(pdp_update_msg, dict):
+ _LOGGER.warning(audit.warn(
+ "unexpected message from PDP: {}".format(json.dumps(pdp_update_msg)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ continue
+
+ _LOGGER.debug("raw policy_update[%s]: %s", idx, json.dumps(pdp_update_msg))
+
+ deployed_policies = [
+ {PDP_METADATA: {PDP_POLICY_ID: p_deployed.get(PDP_POLICY_ID),
+ PDP_POLICY_VERSION: p_deployed.get(PDP_POLICY_VERSION)}}
+ for p_deployed in pdp_update_msg.get(DEPLOYED_POLICIES, [])
+ if (p_deployed.get(PDP_POLICY_ID) is not None
+ and p_deployed.get(PDP_POLICY_VERSION) is not None)]
+
+ undeployed_policies = [
+ {PDP_METADATA: {PDP_POLICY_ID: p_undeployed.get(PDP_POLICY_ID),
+ PDP_POLICY_VERSION: p_undeployed.get(PDP_POLICY_VERSION)}}
+ for p_undeployed in pdp_update_msg.get(UNDEPLOYED_POLICIES, [])
+ if (p_undeployed.get(PDP_POLICY_ID) is not None
+ and p_undeployed.get(PDP_POLICY_VERSION) is not None)]
+
+ if not deployed_policies and not undeployed_policies:
+ _LOGGER.warning(audit.warn(
+ "no policy deployed or undeployed: {}".format(json.dumps(pdp_update_msg)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ continue
+
+ policy_update = {DEPLOYED_POLICIES: deployed_policies,
+ UNDEPLOYED_POLICIES: undeployed_policies}
+ _LOGGER.info(audit.info("policy_update[{}]: {}"
+ .format(idx, json.dumps(policy_update))))
+
+ policies_updated.append(policy_update)
+
+ if not policies_updated:
+ _LOGGER.warning(audit.warn(
+ "erroneous notification from PDP: {}".format(json.dumps(policy_updates)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ return
+
+ with self._lock:
+ policy_updater = self._policy_updater
+ if policy_updater is not None:
+ policy_updater.policy_update(audit, policies_updated)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_policy_update_message",
+ json.dumps(policy_updates))
+ _LOGGER.exception(audit.fatal(error_msg))
+
+ def shutdown(self, audit):
+ """Shutdown the policy_listener"""
+ _LOGGER.info(audit.info("shutdown policy_listener - no waiting..."))
+ with self._lock:
+ self._keep_running = False
+ self._policy_updater = None
+ self._run_event.set()