aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-02-01 14:16:56 -0500
committerAlex Shatov <alexs@att.com>2018-02-01 14:16:56 -0500
commitac779d374ca12099eaeb8e5d89e65df37fd8a8f4 (patch)
tree181e0b72ead781dcab526124c81096980a552e8b /policyhandler
parent2322ef8736e839d62930d9b6c847ce818261c26c (diff)
improved message to deployment-handler and on API
* added errored_scopes and scope_prefixes to the message to deployment-handler - to prevent erroneous removal of policies * hardcoded condition for scope not found 404 at policy-engine to separate it from error on the scope retrieval 400 * adjusting the web API message in sync with notification to deployment-handler * unit test coverage 74% Change-Id: Ie736a1b7aee0631b6785669c6b765bd240dd77b8 Issue-ID: DCAEGEN2-249 Signed-off-by: Alex Shatov <alexs@att.com>
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/deploy_handler.py25
-rw-r--r--policyhandler/policy_consts.py10
-rw-r--r--policyhandler/policy_rest.py84
-rw-r--r--policyhandler/policy_updater.py19
-rw-r--r--policyhandler/web_server.py19
5 files changed, 82 insertions, 75 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 0dc86b9..306a637 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -1,6 +1,6 @@
# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -63,33 +63,20 @@ class DeployHandler(object):
DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url)
@staticmethod
- def policy_update(audit, latest_policies, removed_policies=None,
- errored_policies=None, catch_up=False):
+ def policy_update(audit, message):
"""post policy_updated message to deploy-handler"""
- if not latest_policies and not removed_policies and not catch_up:
+ if not message:
return
- latest_policies = latest_policies or {}
- removed_policies = removed_policies or {}
- errored_policies = errored_policies or {}
-
DeployHandler._lazy_init()
- msg = {
- "catch_up" : catch_up,
- "latest_policies" : latest_policies,
- "removed_policies" : removed_policies,
- "errored_policies" : errored_policies
- }
sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity,
targetServiceName=DeployHandler._url_path)
headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id}
- msg_str = json.dumps(msg)
+ msg_str = json.dumps(message)
headers_str = json.dumps(headers)
- DeployHandler._logger.info(
- "catch_up(%s) latest_policies[%s], removed_policies[%s], errored_policies[%s]",
- catch_up, len(latest_policies), len(removed_policies), len(errored_policies))
+ DeployHandler._logger.info("message: %s", msg_str)
log_line = "post to deployment-handler {0} msg={1} headers={2}".format(
DeployHandler._url_path, msg_str, headers_str)
@@ -107,7 +94,7 @@ class DeployHandler(object):
res = None
try:
res = DeployHandler._requests_session.post(
- DeployHandler._url_path, json=msg, headers=headers
+ DeployHandler._url_path, json=message, headers=headers
)
except requests.exceptions.RequestException as ex:
error_msg = "failed to post to deployment-handler {0} {1} msg={2} headers={3}" \
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index f6a1a9e..bcac080 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -1,6 +1,6 @@
# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,3 +24,11 @@ POLICY_VERSION = "policyVersion"
POLICY_NAME = "policyName"
POLICY_BODY = 'policy_body'
POLICY_CONFIG = 'config'
+
+CATCH_UP = "catch_up"
+LATEST_POLICIES = "latest_policies"
+REMOVED_POLICIES = "removed_policies"
+ERRORED_POLICIES = "errored_policies"
+ERRORED_SCOPES = "errored_scopes"
+SCOPE_PREFIXES = "scope_prefixes"
+POLICY_FILTER = "policy_filter"
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 800c564..22ed640 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -19,18 +19,23 @@
"""policy-client communicates with policy-engine thru REST API"""
-import logging
-import json
import copy
+import json
+import logging
import time
from multiprocessing.dummy import Pool as ThreadPool
+
import requests
from .config import Config
-from .policy_consts import POLICY_ID, POLICY_NAME, POLICY_BODY, POLICY_CONFIG
-from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode
+from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode,
+ AuditResponseCode)
+from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, POLICY_BODY,
+ POLICY_CONFIG, POLICY_FILTER, POLICY_ID,
+ POLICY_NAME, SCOPE_PREFIXES, LATEST_POLICIES)
from .policy_utils import PolicyUtils
+
class PolicyRest(object):
""" policy-engine """
_logger = logging.getLogger("policy_handler.policy_rest")
@@ -171,7 +176,7 @@ class PolicyRest(object):
return res.status_code, res_data
@staticmethod
- def validate_policy(policy):
+ def _validate_policy(policy):
"""Validates the config on policy"""
if not policy:
return
@@ -185,22 +190,6 @@ class PolicyRest(object):
)
@staticmethod
- def validate_policies(policies):
- """Validate the config on policies. Returns (valid, errored) tuple"""
- if not policies:
- return None, policies
-
- valid_policies = {}
- errored_policies = {}
- for (policy_id, policy) in policies.iteritems():
- if PolicyRest.validate_policy(policy):
- valid_policies[policy_id] = policy
- else:
- errored_policies[policy_id] = policy
-
- return valid_policies, errored_policies
-
- @staticmethod
def get_latest_policy(aud_policy_id):
"""Get the latest policy for the policy_id from the policy-engine"""
PolicyRest._lazy_init()
@@ -260,7 +249,7 @@ class PolicyRest(object):
return None
audit.set_http_status_code(status_code)
- if not PolicyRest.validate_policy(latest_policy):
+ if not PolicyRest._validate_policy(latest_policy):
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.error(
"received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
@@ -369,7 +358,7 @@ class PolicyRest(object):
get the latest policies by policy_filter
or all the latest policies of the same scope from the policy-engine
"""
- audit, policy_filter, error_if_not_found = aud_policy_filter
+ audit, policy_filter, scope_prefix = aud_policy_filter
str_policy_filter = json.dumps(policy_filter)
PolicyRest._logger.debug("%s", str_policy_filter)
@@ -379,8 +368,18 @@ class PolicyRest(object):
str_policy_filter, json.dumps(policy_configs or []))
latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+
+ if scope_prefix and not policy_configs \
+ and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value:
+ audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR)
+ )
+ return None, latest_policies, scope_prefix
+
if not latest_policies:
- if error_if_not_found:
+ if not scope_prefix:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.warn(
"received no policies from PDP for policy_filter {0}: {1}"
@@ -389,28 +388,38 @@ class PolicyRest(object):
errorDescription=AuditResponseCode.get_human_text(
AuditResponseCode.DATA_ERROR)
)
- return None, latest_policies
+ return None, latest_policies, None
audit.set_http_status_code(status_code)
- return PolicyRest.validate_policies(latest_policies)
+ valid_policies = {}
+ errored_policies = {}
+ for (policy_id, policy) in latest_policies.iteritems():
+ if PolicyRest._validate_policy(policy):
+ valid_policies[policy_id] = policy
+ else:
+ errored_policies[policy_id] = policy
+ return valid_policies, errored_policies, None
@staticmethod
def get_latest_policies(audit, policy_filter=None):
"""Get the latest policies of the same scope from the policy-engine"""
PolicyRest._lazy_init()
+ result = {}
aud_policy_filters = None
str_metrics = None
str_policy_filters = json.dumps(policy_filter or PolicyRest._scope_prefixes)
if policy_filter is not None:
- aud_policy_filters = [(audit, policy_filter, True)]
+ aud_policy_filters = [(audit, policy_filter, None)]
str_metrics = "get_latest_policies for policy_filter {0}".format(
str_policy_filters)
+ result[POLICY_FILTER] = copy.deepcopy(policy_filter)
else:
- aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, False)
+ aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix)
for scope_prefix in PolicyRest._scope_prefixes]
str_metrics = "get_latest_policies for scopes {0} {1}".format( \
len(PolicyRest._scope_prefixes), str_policy_filters)
+ result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes)
PolicyRest._logger.debug("%s", str_policy_filters)
audit.metrics_start(str_metrics)
@@ -429,15 +438,16 @@ class PolicyRest(object):
str_metrics, len(latest_policies), json.dumps(latest_policies)), \
targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config)
- # latest_policies == [(valid_policies, errored_policies), ...]
- valid_policies = dict(
- pair for (vps, _) in latest_policies if vps for pair in vps.iteritems())
+ # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...]
+ result[LATEST_POLICIES] = dict(
+ pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems())
- errored_policies = dict(
- pair for (_, eps) in latest_policies if eps for pair in eps.iteritems())
+ result[ERRORED_POLICIES] = dict(
+ pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems())
- PolicyRest._logger.debug(
- "got policies for policy_filters: %s. valid_policies: %s errored_policies: %s",
- str_policy_filters, json.dumps(valid_policies), json.dumps(errored_policies))
+ result[ERRORED_SCOPES] = [esp for (_, _, esp) in latest_policies if esp]
+
+ PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
+ str_policy_filters, json.dumps(result))
- return valid_policies, errored_policies
+ return result
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 0e06f2c..2bce30b 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -1,6 +1,6 @@
# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,14 +19,15 @@
"""policy-updater thread"""
-import logging
import json
+import logging
from Queue import Queue
-from threading import Thread, Lock
+from threading import Lock, Thread
-from .policy_rest import PolicyRest
from .deploy_handler import DeployHandler
from .onap.audit import Audit
+from .policy_consts import CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES
+from .policy_rest import PolicyRest
class PolicyUpdater(Thread):
"""queue and handle the policy-updates in a separate thread"""
@@ -72,7 +73,8 @@ class PolicyUpdater(Thread):
updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
(audit, policies_updated, policies_removed))
- DeployHandler.policy_update(audit, updated_policies, removed_policies=removed_policies)
+ message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
+ DeployHandler.policy_update(audit, message)
audit.audit_done()
self._queue.task_done()
@@ -114,15 +116,16 @@ class PolicyUpdater(Thread):
return False
PolicyUpdater._logger.info("catch_up")
- latest_policies, errored_policies = PolicyRest.get_latest_policies(aud_catch_up)
+
+ result = PolicyRest.get_latest_policies(aud_catch_up)
+ result[CATCH_UP] = True
if not aud_catch_up.is_success():
PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors")
if not audit:
self._queue.task_done()
else:
- DeployHandler.policy_update(
- aud_catch_up, latest_policies, errored_policies=errored_policies, catch_up=True)
+ DeployHandler.policy_update(aud_catch_up, result)
self._reset_queue()
success, _, _ = aud_catch_up.audit_done()
PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index 5b3227d..17c06b4 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -79,16 +79,15 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s", req_info)
- valid_policies, errored_policies = PolicyRest.get_latest_policies(audit)
+ result = PolicyRest.get_latest_policies(audit)
- res = {"valid_policies": valid_policies, "errored_policies": errored_policies}
- PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(res))
+ PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result))
- success, http_status_code, _ = audit.audit_done(result=json.dumps(res))
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
if not success:
cherrypy.response.status = http_status_code
- return res
+ return result
@cherrypy.expose
@cherrypy.tools.json_out()
@@ -153,16 +152,16 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \
req_info, str_policy_filter, json.dumps(cherrypy.request.headers))
- res, _ = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {}
+ result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {}
- PolicyWeb.logger.info("result %s: policy_filter=%s res=%s", \
- req_info, str_policy_filter, json.dumps(res))
+ PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", \
+ req_info, str_policy_filter, json.dumps(result))
- success, http_status_code, _ = audit.audit_done(result=json.dumps(res))
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
if not success:
cherrypy.response.status = http_status_code
- return res
+ return result
@cherrypy.expose
@cherrypy.tools.json_out()