aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/pdp_api_v0
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/pdp_api_v0')
-rw-r--r--policyhandler/pdp_api_v0/__init__.py30
-rw-r--r--policyhandler/pdp_api_v0/pdp_consts.py23
-rw-r--r--policyhandler/pdp_api_v0/policy_listener.py309
-rw-r--r--policyhandler/pdp_api_v0/policy_matcher.py265
-rw-r--r--policyhandler/pdp_api_v0/policy_rest.py605
-rw-r--r--policyhandler/pdp_api_v0/policy_updates.py107
-rw-r--r--policyhandler/pdp_api_v0/policy_utils.py120
7 files changed, 1459 insertions, 0 deletions
diff --git a/policyhandler/pdp_api_v0/__init__.py b/policyhandler/pdp_api_v0/__init__.py
new file mode 100644
index 0000000..0196508
--- /dev/null
+++ b/policyhandler/pdp_api_v0/__init__.py
@@ -0,0 +1,30 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""<=2018 http api to policy-engine /getConfig that is going to be replaced in 2019"""
+
+from .policy_matcher import PolicyMatcher
+from .policy_rest import PolicyRest
+from .policy_listener import PolicyListener
+from .policy_updates import PolicyUpdates
+
+def get_pdp_api_info():
+ """info on which version of pdp api is in effect"""
+ return ("folders: PolicyMatcher({}), PolicyRest({}), PolicyListener({}), PolicyUpdates({})"
+ .format(PolicyMatcher.PDP_API_FOLDER, PolicyRest.PDP_API_FOLDER,
+ PolicyListener.PDP_API_FOLDER, PolicyUpdates.PDP_API_FOLDER
+ ))
diff --git a/policyhandler/pdp_api_v0/pdp_consts.py b/policyhandler/pdp_api_v0/pdp_consts.py
new file mode 100644
index 0000000..d1c0b44
--- /dev/null
+++ b/policyhandler/pdp_api_v0/pdp_consts.py
@@ -0,0 +1,23 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""contants of PDP"""
+
+POLICY_VERSION = "policyVersion"
+POLICY_NAME = "policyName"
+POLICY_CONFIG = 'config'
+MATCHING_CONDITIONS = "matchingConditions"
diff --git a/policyhandler/pdp_api_v0/policy_listener.py b/policyhandler/pdp_api_v0/policy_listener.py
new file mode 100644
index 0000000..67e4c49
--- /dev/null
+++ b/policyhandler/pdp_api_v0/policy_listener.py
@@ -0,0 +1,309 @@
+# ================================================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""
+policy-listener communicates with policy-engine
+thru web-socket to receive push notifications
+on updates and removal of policies.
+
+on receiving the policy-notifications, the policy-receiver
+passes the notifications to policy-updater
+"""
+
+import copy
+import json
+import os
+import ssl
+import time
+import urllib.parse
+from datetime import datetime
+from threading import Lock, Thread
+
+import websocket
+
+from ..config import Config, Settings
+from ..onap.audit import Audit
+from ..utils import Utils
+from .pdp_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
+
+LOADED_POLICIES = 'loadedPolicies'
+REMOVED_POLICIES = 'removedPolicies'
+POLICY_VER = 'versionNo'
+POLICY_MATCHES = 'matches'
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyListener(Thread):
+ """web-socket to PolicyEngine"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+ WS_STARTED = "started"
+ WS_START_COUNT = "start_count"
+ WS_CLOSE_COUNT = "close_count"
+ WS_ERROR_COUNT = "error_count"
+ WS_PONG_COUNT = "pong_count"
+ WS_MESSAGE_COUNT = "message_count"
+ WS_MESSAGE_TIMESTAMP = "message_timestamp"
+ WS_STATUS = "status"
+ WS_PING_INTERVAL_DEFAULT = 30
+ WEB_SOCKET_HEALTH = "web_socket_health"
+
+ def __init__(self, audit, policy_updater):
+ """web-socket inside the thread to receive policy notifications from PolicyEngine"""
+ Thread.__init__(self, name="policy_receiver", daemon=True)
+
+ self._policy_updater = policy_updater
+ self._lock = Lock()
+ self._keep_running = True
+ self._settings = Settings(Config.FIELD_POLICY_ENGINE)
+
+ self._sleep_before_restarting = 5
+ self._web_socket_url = None
+ self._web_socket_sslopt = None
+ self._tls_wss_ca_mode = None
+ self._web_socket = None
+ self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT
+ self._web_socket_health = {
+ PolicyListener.WS_START_COUNT: 0,
+ PolicyListener.WS_CLOSE_COUNT: 0,
+ PolicyListener.WS_ERROR_COUNT: 0,
+ PolicyListener.WS_PONG_COUNT: 0,
+ PolicyListener.WS_MESSAGE_COUNT: 0,
+ PolicyListener.WS_STATUS: "created"
+ }
+
+ Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH, self._get_health)
+ self.reconfigure(audit)
+
+ def reconfigure(self, audit):
+ """configure and reconfigure the web-socket"""
+ with self._lock:
+ _LOGGER.info(audit.info("web_socket_health {}".format(
+ json.dumps(self._get_health(), sort_keys=True))))
+ self._sleep_before_restarting = 5
+ self._settings.set_config(Config.discovered_config)
+ changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
+
+ if not changed:
+ self._settings.commit_change()
+ return False
+
+ prev_web_socket_url = self._web_socket_url
+ prev_web_socket_sslopt = self._web_socket_sslopt
+ prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
+
+ self._web_socket_sslopt = None
+
+ resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/",
+ config.get("path_notifications", "/pdp/notifications"))
+
+ self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE)
+
+ self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS)
+ if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60:
+ self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT
+
+ if resturl.startswith("https:"):
+ self._web_socket_url = resturl.replace("https:", "wss:")
+
+ verify = Config.get_tls_verify(self._tls_wss_ca_mode)
+ if verify is False:
+ self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE}
+ elif verify is True:
+ pass
+ else:
+ self._web_socket_sslopt = {'ca_certs': verify}
+
+ else:
+ self._web_socket_url = resturl.replace("http:", "ws:")
+
+ log_changed = (
+ "changed web_socket_url(%s) or tls_wss_ca_mode(%s)"
+ " or ws_ping_interval_in_secs(%s): %s" %
+ (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs,
+ self._settings))
+ if (self._web_socket_url == prev_web_socket_url
+ and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)
+ and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs):
+ _LOGGER.info(audit.info("not {}".format(log_changed)))
+ self._settings.commit_change()
+ return False
+
+ _LOGGER.info(audit.info(log_changed))
+ self._settings.commit_change()
+
+ self._stop_notifications()
+ return True
+
+ def run(self):
+ """listen on web-socket and pass the policy notifications to policy-updater"""
+ _LOGGER.info("starting policy_receiver...")
+ websocket.enableTrace(True)
+ restarting = False
+ while True:
+ if not self._get_keep_running():
+ break
+
+ self._stop_notifications()
+
+ if restarting:
+ with self._lock:
+ sleep_before_restarting = self._sleep_before_restarting
+ _LOGGER.info(
+ "going to sleep for %s secs before restarting policy-notifications",
+ sleep_before_restarting)
+
+ time.sleep(sleep_before_restarting)
+ if not self._get_keep_running():
+ break
+
+ with self._lock:
+ web_socket_url = self._web_socket_url
+ sslopt = copy.deepcopy(self._web_socket_sslopt)
+ tls_wss_ca_mode = self._tls_wss_ca_mode
+ ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
+
+ _LOGGER.info(
+ "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)"
+ " ws_ping_interval_in_secs(%s)",
+ web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs)
+
+ self._web_socket = websocket.WebSocketApp(
+ web_socket_url,
+ on_open=self._on_ws_open,
+ on_message=self._on_pdp_message,
+ on_close=self._on_ws_close,
+ on_error=self._on_ws_error,
+ on_pong=self._on_ws_pong
+ )
+
+ _LOGGER.info("waiting for policy-notifications...")
+ self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs)
+ restarting = True
+
+ Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH)
+ _LOGGER.info("exit policy-receiver")
+
+ def _get_keep_running(self):
+ """thread-safe check whether to continue running"""
+ with self._lock:
+ keep_running = self._keep_running
+ return keep_running
+
+ def _stop_notifications(self):
+ """close the web-socket == stops the notification service if running."""
+ with self._lock:
+ if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
+ self._web_socket.close()
+ _LOGGER.info("stopped receiving notifications from PDP")
+
+ def _on_pdp_message(self, *args):
+ """received the notification from PDP"""
+ self._web_socket_health[PolicyListener.WS_MESSAGE_COUNT] += 1
+ self._web_socket_health[PolicyListener.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow())
+ try:
+ message = args and args[-1]
+ _LOGGER.info("Received notification message: %s", message)
+ _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True))
+ if not message:
+ return
+ message = json.loads(message)
+
+ if not message or not isinstance(message, dict):
+ _LOGGER.warning("unexpected message from PDP: %s", json.dumps(message))
+ return
+
+ policies_updated = [
+ {POLICY_NAME: policy.get(POLICY_NAME),
+ POLICY_VERSION: policy.get(POLICY_VER),
+ MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})}
+ for policy in message.get(LOADED_POLICIES, [])
+ ]
+
+ policies_removed = [
+ {POLICY_NAME: removed_policy.get(POLICY_NAME),
+ POLICY_VERSION: removed_policy.get(POLICY_VER)}
+ for removed_policy in message.get(REMOVED_POLICIES, [])
+ ]
+
+ if not policies_updated and not policies_removed:
+ _LOGGER.info("no policy updated or removed")
+ return
+
+ self._policy_updater.policy_update(policies_updated, policies_removed)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_pdp_message", json.dumps(message))
+
+ _LOGGER.exception(error_msg)
+
+ def _on_ws_error(self, error):
+ """report an error"""
+ _LOGGER.exception("policy-notification error %s", str(error))
+ self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5
+
+ self._web_socket_health[PolicyListener.WS_STATUS] = "error"
+ self._web_socket_health[PolicyListener.WS_ERROR_COUNT] += 1
+ self._web_socket_health["last_error"] = {
+ "error": str(error), "timestamp": str(datetime.utcnow())
+ }
+ _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_close(self, code, reason):
+ """restart web-socket on close"""
+ self._web_socket_health["last_closed"] = str(datetime.utcnow())
+ self._web_socket_health[PolicyListener.WS_STATUS] = "closed"
+ self._web_socket_health[PolicyListener.WS_CLOSE_COUNT] += 1
+ _LOGGER.info(
+ "lost connection(%s, %s) to PDP web_socket_health %s",
+ code, reason, json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_open(self):
+ """started web-socket"""
+ self._web_socket_health[PolicyListener.WS_STATUS] = PolicyListener.WS_STARTED
+ self._web_socket_health[PolicyListener.WS_START_COUNT] += 1
+ self._web_socket_health[PolicyListener.WS_STARTED] = datetime.utcnow()
+ _LOGGER.info("opened connection to PDP web_socket_health %s",
+ json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_pong(self, pong):
+ """pong = response to pinging the server of the web-socket"""
+ self._web_socket_health[PolicyListener.WS_PONG_COUNT] += 1
+ _LOGGER.info(
+ "pong(%s) from connection to PDP web_socket_health %s",
+ pong, json.dumps(self._get_health(), sort_keys=True))
+
+ def _get_health(self):
+ """returns the healthcheck of the web-socket as json"""
+ web_socket_health = copy.deepcopy(self._web_socket_health)
+ web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs
+ started = web_socket_health.get(PolicyListener.WS_STARTED)
+ if started:
+ web_socket_health[PolicyListener.WS_STARTED] = str(started)
+ web_socket_health["uptime"] = str(datetime.utcnow() - started)
+ return web_socket_health
+
+
+ def shutdown(self, audit):
+ """Shutdown the policy-listener"""
+ _LOGGER.info(audit.info("shutdown policy-listener"))
+ with self._lock:
+ self._keep_running = False
+
+ self._stop_notifications()
+
+ if self.is_alive():
+ self.join()
diff --git a/policyhandler/pdp_api_v0/policy_matcher.py b/policyhandler/pdp_api_v0/policy_matcher.py
new file mode 100644
index 0000000..357af49
--- /dev/null
+++ b/policyhandler/pdp_api_v0/policy_matcher.py
@@ -0,0 +1,265 @@
+# ================================================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""policy-matcher matches the policies from deployment-handler to policies from policy-engine"""
+
+import json
+import os
+import re
+
+from ..deploy_handler import DeployHandler, PolicyUpdateMessage
+from ..onap.audit import AuditHttpCode, AuditResponseCode
+from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
+ POLICY_FILTER, POLICY_VERSIONS)
+from ..utils import RegexCoarser, Utils
+from .pdp_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
+from .policy_rest import PolicyRest
+
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyMatcher(object):
+ """policy-matcher - static class"""
+ PENDING_UPDATE = "pending_update"
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+
+ @staticmethod
+ def get_deployed_policies(audit):
+ """get the deployed policies and policy-filters"""
+ deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit)
+
+ if audit.is_not_found():
+ warning_txt = "got no deployed policies or policy-filters"
+ _LOGGER.warning(warning_txt)
+ return {"warning": warning_txt}, None, None
+
+ if not audit.is_success() or (not deployed_policies and not deployed_policy_filters):
+ error_txt = "failed to retrieve policies from deployment-handler"
+ _LOGGER.error(error_txt)
+ return {"error": error_txt}, None, None
+
+ return None, deployed_policies, deployed_policy_filters
+
+
+ @staticmethod
+ def build_catch_up_message(audit, deployed_policies, deployed_policy_filters):
+ """
+ find the latest policies from policy-engine for the deployed policies and policy-filters
+ """
+
+ if not (deployed_policies or deployed_policy_filters):
+ error_txt = "no deployed policies or policy-filters"
+ _LOGGER.warning(error_txt)
+ return {"error": error_txt}, None
+
+ coarse_regex_patterns = PolicyMatcher.calc_coarse_patterns(
+ audit, deployed_policies, deployed_policy_filters)
+
+ if not coarse_regex_patterns:
+ error_txt = ("failed to construct the coarse_regex_patterns from " +
+ "deployed_policies: {} and deployed_policy_filters: {}"
+ .format(deployed_policies, deployed_policy_filters))
+ _LOGGER.error(audit.error(
+ error_txt, error_code=AuditResponseCode.DATA_ERROR))
+ audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value)
+ return {"error": error_txt}, None
+
+ pdp_response = PolicyRest.get_latest_policies(
+ audit, policy_filters=[{POLICY_NAME: policy_name_pattern}
+ for policy_name_pattern in coarse_regex_patterns]
+ )
+
+ if not audit.is_success():
+ error_txt = "failed to retrieve policies from policy-engine"
+ _LOGGER.warning(error_txt)
+ return {"error": error_txt}, None
+
+ latest_policies = pdp_response.get(LATEST_POLICIES, {})
+ errored_policies = pdp_response.get(ERRORED_POLICIES, {})
+
+ latest_policies, changed_policies, policy_filter_matches = PolicyMatcher._match_policies(
+ audit, latest_policies, deployed_policies, deployed_policy_filters)
+
+ errored_policies = dict((policy_id, policy)
+ for (policy_id, policy) in errored_policies.items()
+ if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS))
+
+ removed_policies = dict(
+ (policy_id, True)
+ for (policy_id, deployed_policy) in deployed_policies.items()
+ if deployed_policy.get(POLICY_VERSIONS)
+ and policy_id not in latest_policies
+ and policy_id not in errored_policies
+ )
+
+ return ({LATEST_POLICIES: latest_policies, ERRORED_POLICIES: errored_policies},
+ PolicyUpdateMessage(changed_policies,
+ removed_policies,
+ policy_filter_matches))
+
+
+ @staticmethod
+ def calc_coarse_patterns(audit, deployed_policies, deployed_policy_filters):
+ """calculate the coarsed patterns on policy-names in policies and policy-filters"""
+ coarse_regex = RegexCoarser()
+ for policy_id in deployed_policies or {}:
+ coarse_regex.add_regex_pattern(policy_id)
+
+ for policy_filter in (deployed_policy_filters or {}).values():
+ policy_name_pattern = policy_filter.get(POLICY_FILTER, {}).get(POLICY_NAME)
+ coarse_regex.add_regex_pattern(policy_name_pattern)
+
+ coarse_regex_patterns = coarse_regex.get_coarse_regex_patterns()
+ _LOGGER.debug(
+ audit.debug("coarse_regex_patterns({}) combined_regex_pattern({}) for patterns({})"
+ .format(coarse_regex_patterns,
+ coarse_regex.get_combined_regex_pattern(),
+ coarse_regex.patterns)))
+ return coarse_regex_patterns
+
+
+ @staticmethod
+ def match_to_deployed_policies(audit, policies_updated, policies_removed):
+ """match the policies_updated, policies_removed versus deployed policies"""
+ deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit)
+ if not audit.is_success():
+ return {}, {}, {}
+
+ _, changed_policies, policy_filter_matches = PolicyMatcher._match_policies(
+ audit, policies_updated, deployed_policies, deployed_policy_filters)
+
+ policies_removed = dict((policy_id, policy)
+ for (policy_id, policy) in policies_removed.items()
+ if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS))
+
+ return changed_policies, policies_removed, policy_filter_matches
+
+
+ @staticmethod
+ def _match_policies(audit, policies, deployed_policies, deployed_policy_filters):
+ """
+ Match policies to deployed policies either by policy_id or the policy-filters.
+
+ Also calculates the policies that changed in comparison to deployed policies
+ """
+ matching_policies = {}
+ changed_policies = {}
+ policy_filter_matches = {}
+
+ policies = policies or {}
+ deployed_policies = deployed_policies or {}
+ deployed_policy_filters = deployed_policy_filters or {}
+
+ for (policy_id, policy) in policies.items():
+ new_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
+ deployed_policy = deployed_policies.get(policy_id)
+
+ if deployed_policy:
+ matching_policies[policy_id] = policy
+
+ policy_changed = (deployed_policy and new_version
+ and (deployed_policy.get(PolicyMatcher.PENDING_UPDATE)
+ or {new_version} ^
+ deployed_policy.get(POLICY_VERSIONS, {}).keys()))
+ if policy_changed:
+ changed_policies[policy_id] = policy
+ policy_filter_matches[policy_id] = {}
+
+ in_filters = False
+ for (policy_filter_id, policy_filter) in deployed_policy_filters.items():
+ if not PolicyMatcher._match_policy_to_filter(
+ audit, policy_id, policy,
+ policy_filter_id, policy_filter.get(POLICY_FILTER)):
+ continue
+
+ if policy_changed or not deployed_policy:
+ in_filters = True
+ if policy_id not in policy_filter_matches:
+ policy_filter_matches[policy_id] = {}
+ policy_filter_matches[policy_id][policy_filter_id] = True
+
+ if not deployed_policy and in_filters:
+ matching_policies[policy_id] = policy
+ changed_policies[policy_id] = policy
+
+ return matching_policies, changed_policies, policy_filter_matches
+
+
+ @staticmethod
+ def _match_policy_to_filter(audit, policy_id, policy, policy_filter_id, policy_filter):
+ """Match the policy to the policy-filter"""
+ if not policy_id or not policy or not policy_filter or not policy_filter_id:
+ return False
+
+ filter_policy_name = policy_filter.get(POLICY_NAME)
+ if not filter_policy_name:
+ return False
+
+ policy_body = policy.get(POLICY_BODY)
+ if not policy_body:
+ return False
+
+ policy_name = policy_body.get(POLICY_NAME)
+ if not policy_name:
+ return False
+
+ log_line = "policy {} to filter id {}: {}".format(json.dumps(policy),
+ policy_filter_id,
+ json.dumps(policy_filter))
+ # _LOGGER.debug(audit.debug("matching {}...".format(log_line)))
+
+ if (filter_policy_name != policy_id and filter_policy_name != policy_name
+ and not re.match(filter_policy_name, policy_name)):
+ _LOGGER.debug(
+ audit.debug("not match by policyName: {} != {}: {}"
+ .format(policy_name, filter_policy_name, log_line)))
+ return False
+
+ matching_conditions = policy_body.get(MATCHING_CONDITIONS, {})
+ if not isinstance(matching_conditions, dict):
+ return False
+
+ filter_onap_name = policy_filter.get("onapName")
+ policy_onap_name = matching_conditions.get("ONAPName")
+ if filter_onap_name and filter_onap_name != policy_onap_name:
+ _LOGGER.debug(
+ audit.debug("not match by ONAPName: {} != {}: {}"
+ .format(policy_onap_name, filter_onap_name, log_line)))
+ return False
+
+ filter_config_name = policy_filter.get("configName")
+ policy_config_name = matching_conditions.get("ConfigName")
+ if filter_config_name and filter_config_name != policy_config_name:
+ _LOGGER.debug(
+ audit.debug("not match by configName: {} != {}: {}"
+ .format(policy_config_name, filter_config_name, log_line)))
+ return False
+
+ filter_config_attributes = policy_filter.get("configAttributes")
+ if filter_config_attributes and isinstance(filter_config_attributes, dict):
+ for filter_key, filter_config_attribute in filter_config_attributes.items():
+ if (filter_key not in matching_conditions
+ or filter_config_attribute != matching_conditions.get(filter_key)):
+ _LOGGER.debug(
+ audit.debug("not match by configAttributes: {} != {}: {}"
+ .format(json.dumps(matching_conditions),
+ json.dumps(filter_config_attributes),
+ log_line)))
+ return False
+
+ _LOGGER.debug(audit.debug("matched {}".format(log_line)))
+ return True
diff --git a/policyhandler/pdp_api_v0/policy_rest.py b/policyhandler/pdp_api_v0/policy_rest.py
new file mode 100644
index 0000000..c59625e
--- /dev/null
+++ b/policyhandler/pdp_api_v0/policy_rest.py
@@ -0,0 +1,605 @@
+# ================================================================================
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""policy-client communicates with policy-engine thru REST API"""
+
+import copy
+import json
+import os
+import time
+import urllib.parse
+from multiprocessing.dummy import Pool as ThreadPool
+from threading import Lock
+
+import requests
+
+from ..config import Config, Settings
+from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
+ AuditResponseCode, Metrics)
+from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
+ POLICY_FILTER, POLICY_FILTERS, POLICY_ID,
+ POLICY_NAMES)
+from ..utils import Utils
+from .pdp_consts import POLICY_CONFIG, POLICY_NAME, POLICY_VERSION
+from .policy_utils import PolicyUtils
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyRest(object):
+ """using the http API to policy-engine"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+ _lazy_inited = False
+ POLICY_GET_CONFIG = 'getConfig'
+ PDP_CONFIG_STATUS = "policyConfigStatus"
+ PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED"
+ PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND"
+ PDP_CONFIG_MESSAGE = "policyConfigMessage"
+ PDP_NO_RESPONSE_RECEIVED = "No Response Received"
+ PDP_STATUS_CODE_ERROR = 400
+ PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit."
+
+ EXPECTED_VERSIONS = "expected_versions"
+ IGNORE_POLICY_NAMES = "ignore_policy_names"
+ DEFAULT_TIMEOUT_IN_SECS = 60
+
+ _lock = Lock()
+ _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS,
+ Config.THREAD_POOL_SIZE,
+ Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP)
+
+ _requests_session = None
+ _url = None
+ _url_get_config = None
+ _headers = None
+ _target_entity = None
+ _custom_kwargs = {}
+ _thread_pool_size = 4
+ _policy_retry_count = 1
+ _policy_retry_sleep = 0
+ _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
+
+ @staticmethod
+ def _init():
+ """init static config"""
+ PolicyRest._custom_kwargs = {}
+ tls_ca_mode = None
+
+ if not PolicyRest._requests_session:
+ PolicyRest._requests_session = requests.Session()
+
+ changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20)
+ if changed:
+ PolicyRest._requests_session.mount(
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
+ PolicyRest._requests_session.mount(
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
+
+ _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
+ if config:
+ PolicyRest._url = config.get("url")
+ if PolicyRest._url:
+ path_get_config = urllib.parse.urljoin(
+ config.get("path_api", "pdp/api").strip("/")
+ + "/", PolicyRest.POLICY_GET_CONFIG)
+ PolicyRest._url_get_config = urllib.parse.urljoin(PolicyRest._url, path_get_config)
+ PolicyRest._headers = config.get("headers", {})
+ PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
+ _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key(
+ Config.THREAD_POOL_SIZE, 4)
+ if PolicyRest._thread_pool_size < 2:
+ PolicyRest._thread_pool_size = 2
+
+ _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_COUNT, 1)
+ _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_SLEEP, 0)
+
+ tls_ca_mode = config.get(Config.TLS_CA_MODE)
+ PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
+ PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS)
+ if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1:
+ PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS
+
+ _LOGGER.info(
+ "PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s",
+ PolicyRest._target_entity, PolicyRest._url_get_config,
+ Metrics.json_dumps(PolicyRest._headers), tls_ca_mode,
+ PolicyRest._timeout_in_secs, json.dumps(PolicyRest._custom_kwargs),
+ PolicyRest._settings)
+
+ PolicyRest._settings.commit_change()
+ PolicyRest._lazy_inited = True
+
+ @staticmethod
+ def reconfigure():
+ """reconfigure"""
+ with PolicyRest._lock:
+ PolicyRest._settings.set_config(Config.discovered_config)
+ if not PolicyRest._settings.is_changed():
+ PolicyRest._settings.commit_change()
+ return False
+
+ PolicyRest._lazy_inited = False
+ PolicyRest._init()
+ return True
+
+ @staticmethod
+ def _lazy_init():
+ """init static config"""
+ if PolicyRest._lazy_inited:
+ return
+
+ with PolicyRest._lock:
+ if PolicyRest._lazy_inited:
+ return
+
+ PolicyRest._settings.set_config(Config.discovered_config)
+ PolicyRest._init()
+
+ @staticmethod
+ def _pdp_get_config(audit, json_body):
+ """Communication with the policy-engine"""
+ if not PolicyRest._url:
+ _LOGGER.error(
+ audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR))
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
+
+ with PolicyRest._lock:
+ session = PolicyRest._requests_session
+ target_entity = PolicyRest._target_entity
+ url = PolicyRest._url_get_config
+ timeout_in_secs = PolicyRest._timeout_in_secs
+ headers = copy.deepcopy(PolicyRest._headers)
+ custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs)
+
+ metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url)
+
+ headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
+
+ log_action = "post to {} at {}".format(target_entity, url)
+ log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format(
+ json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs),
+ timeout_in_secs)
+ log_line = log_action + " " + log_data
+
+ _LOGGER.info(metrics.metrics_start(log_line))
+
+ res = None
+ try:
+ res = session.post(url, json=json_body, headers=headers, timeout=timeout_in_secs,
+ **custom_kwargs)
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line))
+
+ _LOGGER.exception(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return (error_code, None)
+
+ log_line = "response {} from {}: text={} headers={}".format(
+ res.status_code, log_line, res.text,
+ Metrics.json_dumps(dict(res.request.headers.items())))
+
+ status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res)
+
+ if status_code:
+ return status_code, res_data
+
+ metrics.set_http_status_code(res.status_code)
+ metrics.metrics(log_line)
+ _LOGGER.info(log_line)
+ return res.status_code, res_data
+
+ @staticmethod
+ def _extract_pdp_res_data(audit, metrics, log_line, res):
+ """special treatment of pdp response"""
+ res_data = None
+ if res.status_code == requests.codes.ok:
+ res_data = res.json()
+
+ if res_data and isinstance(res_data, list) and len(res_data) == 1:
+ rslt = res_data[0] or {}
+ if not rslt.get(POLICY_NAME):
+ res_data = None
+ if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED:
+ error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ error_msg = "{} unexpected {}".format(error_code, log_line)
+
+ _LOGGER.error(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return error_code, None
+ return None, res_data
+
+ if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR:
+ try:
+ res_data = res.json()
+ except ValueError:
+ return None, None
+
+ if not res_data or not isinstance(res_data, list) or len(res_data) != 1:
+ return None, None
+
+ rslt = res_data[0]
+ if (rslt and not rslt.get(POLICY_NAME)
+ and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
+ and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
+ status_code = AuditHttpCode.DATA_NOT_FOUND_OK.value
+ info_msg = "{} not found {}".format(status_code, log_line)
+
+ _LOGGER.info(info_msg)
+ metrics.set_http_status_code(status_code)
+ metrics.metrics(info_msg)
+ return status_code, None
+ return None, None
+
+
+ @staticmethod
+ def _validate_policy(policy):
+ """Validates the config on policy"""
+ if not policy:
+ return
+
+ policy_body = policy.get(POLICY_BODY)
+
+ return bool(
+ policy_body
+ and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED
+ and policy_body.get(POLICY_CONFIG)
+ )
+
+ @staticmethod
+ def get_latest_policy(aud_policy_id):
+ """safely try retrieving the latest policy for the policy_id from the policy-engine"""
+ audit, policy_id, expected_versions, ignore_policy_names = aud_policy_id
+ str_metrics = "policy_id({0}), expected_versions({1}) ignore_policy_names({2})".format(
+ policy_id, json.dumps(expected_versions), json.dumps(ignore_policy_names))
+
+ try:
+ return PolicyRest._get_latest_policy(
+ audit, policy_id, expected_versions, ignore_policy_names, str_metrics)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_policy", str_metrics))
+
+ _LOGGER.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
+
+
+ @staticmethod
+ def _get_latest_policy(audit, policy_id,
+ expected_versions, ignore_policy_names, str_metrics):
+ """retry several times getting the latest policy for the policy_id from the policy-engine"""
+ PolicyRest._lazy_init()
+ latest_policy = None
+ status_code = 0
+ retry_get_config = audit.kwargs.get("retry_get_config")
+ expect_policy_removed = (ignore_policy_names and not expected_versions)
+
+ for retry in range(1, PolicyRest._policy_retry_count + 1):
+ _LOGGER.debug("try(%s) retry_get_config(%s): %s", retry, retry_get_config, str_metrics)
+
+ done, latest_policy, status_code = PolicyRest._get_latest_policy_once(
+ audit, policy_id, expected_versions, ignore_policy_names,
+ expect_policy_removed)
+
+ if done or not retry_get_config or not PolicyRest._policy_retry_sleep:
+ break
+
+ if retry == PolicyRest._policy_retry_count:
+ _LOGGER.error(
+ audit.error("gave up retrying after #{} for policy_id({}) from PDP {}"
+ .format(retry, policy_id, PolicyRest._url_get_config),
+ error_code=AuditResponseCode.DATA_ERROR))
+ break
+
+ _LOGGER.warning(audit.warn(
+ "will retry({}) for policy_id({}) in {} secs from PDP {}".format(
+ retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_get_config),
+ error_code=AuditResponseCode.DATA_ERROR))
+ time.sleep(PolicyRest._policy_retry_sleep)
+
+ if (expect_policy_removed and not latest_policy
+ and AuditHttpCode.RESPONSE_ERROR.value == status_code):
+ audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
+ return None
+
+ audit.set_http_status_code(status_code)
+ if not PolicyRest._validate_policy(latest_policy):
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ _LOGGER.error(audit.error(
+ "received invalid policy from PDP: {}".format(json.dumps(latest_policy)),
+ error_code=AuditResponseCode.DATA_ERROR))
+
+ return latest_policy
+
+ @staticmethod
+ def _get_latest_policy_once(audit, policy_id,
+ expected_versions, ignore_policy_names,
+ expect_policy_removed):
+ """single attempt to get the latest policy for the policy_id from the policy-engine"""
+
+ status_code, policy_bodies = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
+
+ _LOGGER.debug("%s %s policy_bodies: %s",
+ status_code, policy_id, json.dumps(policy_bodies or []))
+
+ latest_policy = PolicyUtils.select_latest_policy(
+ policy_bodies, expected_versions, ignore_policy_names
+ )
+
+ if not latest_policy and not expect_policy_removed:
+ _LOGGER.error(
+ audit.error("received unexpected policy data from PDP for policy_id={}: {}"
+ .format(policy_id, json.dumps(policy_bodies or [])),
+ error_code=AuditResponseCode.DATA_ERROR))
+
+ done = bool(latest_policy
+ or (expect_policy_removed and not policy_bodies)
+ or audit.is_serious_error(status_code))
+
+ return done, latest_policy, status_code
+
+ @staticmethod
+ def get_latest_updated_policies(audit, updated_policies, removed_policies):
+ """safely try retrieving the latest policies for the list of policy_names"""
+ if not updated_policies and not removed_policies:
+ return None, None
+
+ policies_updated = [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION))
+ for policy_id, policy in updated_policies.items()]
+ policies_removed = [(policy_id, policy.get(POLICY_NAMES, {}))
+ for policy_id, policy in removed_policies.items()]
+
+ if not policies_updated and not policies_removed:
+ return None, None
+
+ str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
+ len(policies_updated), json.dumps(policies_updated),
+ len(policies_removed), json.dumps(policies_removed))
+
+ try:
+ return PolicyRest._get_latest_updated_policies(
+ audit, str_metrics, policies_updated, policies_removed)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_updated_policies", str_metrics))
+
+ _LOGGER.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None, None
+
+ @staticmethod
+ def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed):
+ """Get the latest policies of the list of policy_names from the policy-engine"""
+ PolicyRest._lazy_init()
+ metrics_total = Metrics(
+ aud_parent=audit,
+ targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity),
+ targetServiceName=PolicyRest._url_get_config)
+
+ metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ _LOGGER.debug(str_metrics)
+
+ policies_to_find = {}
+ for (policy_id, policy_version) in policies_updated:
+ if not policy_id or not policy_version or not policy_version.isdigit():
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.EXPECTED_VERSIONS: {policy_version: True},
+ PolicyRest.IGNORE_POLICY_NAMES: {}
+ }
+ continue
+ policy[PolicyRest.EXPECTED_VERSIONS][policy_version] = True
+
+ for (policy_id, policy_names) in policies_removed:
+ if not policy_id:
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.IGNORE_POLICY_NAMES: policy_names
+ }
+ continue
+ policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names)
+
+ apns = [(audit, policy_id,
+ policy_to_find.get(PolicyRest.EXPECTED_VERSIONS),
+ policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
+ for (policy_id, policy_to_find) in policies_to_find.items()]
+
+ policies = None
+ apns_length = len(apns)
+ _LOGGER.debug("apns_length(%s) policies_to_find %s", apns_length,
+ json.dumps(policies_to_find))
+
+ if apns_length == 1:
+ policies = [PolicyRest.get_latest_policy(apns[0])]
+ else:
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
+ policies = pool.map(PolicyRest.get_latest_policy, apns)
+ pool.close()
+ pool.join()
+
+ metrics_total.metrics("result({}) get_latest_updated_policies {}: {} {}"
+ .format(apns_length, str_metrics,
+ len(policies), json.dumps(policies)))
+
+ updated_policies = dict((policy[POLICY_ID], policy)
+ for policy in policies
+ if policy and policy.get(POLICY_ID))
+
+ removed_policies = dict((policy_id, True)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if not policy_to_find.get(PolicyRest.EXPECTED_VERSIONS)
+ and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
+ and policy_id not in updated_policies)
+
+ errored_policies = dict((policy_id, policy_to_find)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if policy_id not in updated_policies
+ and policy_id not in removed_policies)
+
+ _LOGGER.debug(
+ "result(%s) updated_policies %s, removed_policies %s, errored_policies %s",
+ apns_length, json.dumps(updated_policies), json.dumps(removed_policies),
+ json.dumps(errored_policies))
+
+ if errored_policies:
+ audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value)
+ audit.error(
+ "errored_policies in PDP: {}".format(json.dumps(errored_policies)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ return updated_policies, removed_policies
+
+
+ @staticmethod
+ def _get_latest_policies(aud_policy_filter):
+ """get the latest policies by policy_filter from the policy-engine"""
+ audit, policy_filter = aud_policy_filter
+ try:
+ str_policy_filter = json.dumps(policy_filter)
+ _LOGGER.debug("%s", str_policy_filter)
+
+ status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter)
+ audit.set_http_status_code(status_code)
+
+ _LOGGER.debug("%s policy_bodies: %s %s", status_code,
+ str_policy_filter, json.dumps(policy_bodies or []))
+
+ latest_policies = PolicyUtils.select_latest_policies(policy_bodies)
+
+ if not latest_policies:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ _LOGGER.warning(audit.warn(
+ "received no policies from PDP for policy_filter {}: {}"
+ .format(str_policy_filter, json.dumps(policy_bodies or [])),
+ error_code=AuditResponseCode.DATA_ERROR))
+ return None, latest_policies
+
+ valid_policies = {}
+ errored_policies = {}
+ for (policy_id, policy) in latest_policies.items():
+ if PolicyRest._validate_policy(policy):
+ valid_policies[policy_id] = policy
+ else:
+ errored_policies[policy_id] = policy
+ return valid_policies, errored_policies
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4})"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "_get_latest_policies", json.dumps(policy_filter)))
+
+ _LOGGER.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None, None
+
+
+ @staticmethod
+ def get_latest_policies(audit, policy_filter=None, policy_filters=None):
+ """Get the latest policies by policy-filter(s) from the policy-engine"""
+ result = {}
+ aud_policy_filters = None
+ str_policy_filters = None
+ str_metrics = None
+ target_entity = None
+
+ try:
+ PolicyRest._lazy_init()
+ if policy_filter:
+ aud_policy_filters = [(audit, policy_filter)]
+ str_policy_filters = json.dumps(policy_filter)
+ str_metrics = "get_latest_policies for policy_filter {0}".format(
+ str_policy_filters)
+ target_entity = ("{0} total get_latest_policies by policy_filter"
+ .format(PolicyRest._target_entity))
+ result[POLICY_FILTER] = copy.deepcopy(policy_filter)
+ elif policy_filters:
+ aud_policy_filters = [
+ (audit, policy_filter)
+ for policy_filter in policy_filters
+ ]
+ str_policy_filters = json.dumps(policy_filters)
+ str_metrics = "get_latest_policies for policy_filters {0}".format(
+ str_policy_filters)
+ target_entity = ("{0} total get_latest_policies by policy_filters"
+ .format(PolicyRest._target_entity))
+ result[POLICY_FILTERS] = copy.deepcopy(policy_filters)
+ else:
+ return result
+
+ _LOGGER.debug("%s", str_policy_filters)
+ metrics_total = Metrics(aud_parent=audit, targetEntity=target_entity,
+ targetServiceName=PolicyRest._url_get_config)
+
+ metrics_total.metrics_start(str_metrics)
+
+ latest_policies = None
+ apfs_length = len(aud_policy_filters)
+ if apfs_length == 1:
+ latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
+ else:
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apfs_length))
+ latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
+ pool.close()
+ pool.join()
+
+ metrics_total.metrics("total result {0}: {1} {2}".format(
+ str_metrics, len(latest_policies), json.dumps(latest_policies)))
+
+ # latest_policies == [(valid_policies, errored_policies), ...]
+ result[LATEST_POLICIES] = dict(
+ pair for (vps, _) in latest_policies if vps for pair in vps.items())
+
+ result[ERRORED_POLICIES] = dict(
+ pair for (_, eps) in latest_policies if eps for pair in eps.items())
+
+ _LOGGER.debug("got policies for policy_filters: %s. result: %s",
+ str_policy_filters, json.dumps(result))
+ return result
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_policies", str_metrics))
+
+ _LOGGER.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
diff --git a/policyhandler/pdp_api_v0/policy_updates.py b/policyhandler/pdp_api_v0/policy_updates.py
new file mode 100644
index 0000000..eafdca2
--- /dev/null
+++ b/policyhandler/pdp_api_v0/policy_updates.py
@@ -0,0 +1,107 @@
+# ================================================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""policy-updates accumulates the policy-update notifications from PDP"""
+
+import json
+import os
+
+from ..onap.audit import Audit
+from ..policy_consts import POLICY_ID, POLICY_NAMES
+from ..utils import Utils
+from .pdp_consts import POLICY_NAME
+from .policy_utils import PolicyUtils
+
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyUpdates(object):
+ """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+
+ def __init__(self):
+ """init and reset"""
+ self._audit = None
+ self._policies_updated = {}
+ self._policies_removed = {}
+
+ def reset(self):
+ """resets the state"""
+ self.__init__()
+
+ def pop_policy_updates(self):
+ """
+ Returns the consolidated (audit, policies_updated, policies_removed)
+ and resets the state
+ """
+ if not self._audit:
+ return None, None, None
+
+ audit = self._audit
+ policies_updated = self._policies_updated
+ policies_removed = self._policies_removed
+
+ self.reset()
+
+ return audit, policies_updated, policies_removed
+
+
+ def push_policy_updates(self, policies_updated, policies_removed):
+ """consolidate the new policies_updated, policies_removed to existing ones"""
+ for policy_body in policies_updated:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+
+ self._policies_updated[policy_id] = policy
+
+ rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES)
+ if rm_policy_names and policy_name in rm_policy_names:
+ del rm_policy_names[policy_name]
+
+ for policy_body in policies_removed:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+
+ if policy_id in self._policies_removed:
+ policy = self._policies_removed[policy_id]
+
+ if POLICY_NAMES not in policy:
+ policy[POLICY_NAMES] = {}
+ policy[POLICY_NAMES][policy_name] = True
+ self._policies_removed[policy_id] = policy
+
+ req_message = ("policy-update notification - updated[{0}], removed[{1}]"
+ .format(len(self._policies_updated),
+ len(self._policies_removed)))
+
+ if not self._audit:
+ self._audit = Audit(job_name="policy_update",
+ req_message=req_message,
+ retry_get_config=True)
+ else:
+ self._audit.req_message = req_message
+
+ _LOGGER.info(
+ "pending(%s) for %s policies_updated %s policies_removed %s",
+ self._audit.request_id, req_message,
+ json.dumps(self._policies_updated), json.dumps(self._policies_removed))
diff --git a/policyhandler/pdp_api_v0/policy_utils.py b/policyhandler/pdp_api_v0/policy_utils.py
new file mode 100644
index 0000000..d337665
--- /dev/null
+++ b/policyhandler/pdp_api_v0/policy_utils.py
@@ -0,0 +1,120 @@
+# ================================================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""utils for policy usage and conversions"""
+
+import re
+
+from ..policy_consts import POLICY_BODY, POLICY_ID
+from ..utils import Utils
+from .pdp_consts import POLICY_CONFIG, POLICY_NAME, POLICY_VERSION
+
+
+class PolicyUtils(object):
+ """policy-client utils"""
+ _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
+
+ @staticmethod
+ def extract_policy_id(policy_name):
+ """ policy_name = policy_id + "." + <version> + "." + <extension>
+ For instance,
+ policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
+ policy_id = DCAE_alex.Config_alex_policy_number_1
+ policy_scope = DCAE_alex
+ policy_class = Config
+ policy_version = 3
+ type = extension = xml
+ delimiter = "."
+ policy_class_delimiter = "_"
+ policy_name in PAP = DCAE_alex.alex_policy_number_1
+ """
+ if not policy_name:
+ return
+ return PolicyUtils._policy_name_ext.sub('', policy_name)
+
+ @staticmethod
+ def parse_policy_config(policy):
+ """try parsing the config in policy."""
+ if not policy:
+ return policy
+ config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
+ if config:
+ policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config)
+ return policy
+
+ @staticmethod
+ def convert_to_policy(policy_body):
+ """wrap policy_body received from policy-engine with policy_id."""
+ if not policy_body:
+ return None
+ policy_name = policy_body.get(POLICY_NAME)
+ policy_version = policy_body.get(POLICY_VERSION)
+ if not policy_name or not policy_version:
+ return None
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ return None
+ return {POLICY_ID:policy_id, POLICY_BODY:policy_body}
+
+ @staticmethod
+ def select_latest_policy(policy_bodies, expected_versions=None, ignore_policy_names=None):
+ """For some reason, the policy-engine returns all version of the policy_bodies.
+ DCAE-Controller is only interested in the latest version
+ """
+ if not policy_bodies:
+ return
+ latest_policy_body = {}
+ for policy_body in policy_bodies:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy_version = policy_body.get(POLICY_VERSION)
+ if not policy_name or not policy_version or not policy_version.isdigit():
+ continue
+ if expected_versions and policy_version not in expected_versions:
+ continue
+ if ignore_policy_names and policy_name in ignore_policy_names:
+ continue
+
+ if (not latest_policy_body
+ or int(latest_policy_body[POLICY_VERSION]) < int(policy_version)):
+ latest_policy_body = policy_body
+
+ return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body))
+
+ @staticmethod
+ def select_latest_policies(policy_bodies):
+ """For some reason, the policy-engine returns all version of the policy_bodies.
+ DCAE-Controller is only interested in the latest versions
+ """
+ if not policy_bodies:
+ return {}
+ policies = {}
+ for policy_body in policy_bodies:
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+ policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
+ if not policy_id or not policy_version or not policy_version.isdigit():
+ continue
+ if (policy_id not in policies
+ or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])):
+ policies[policy_id] = policy
+
+ for policy_id in policies:
+ policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
+
+ return policies