diff options
author | Alex Shatov <alexs@att.com> | 2018-02-01 14:16:56 -0500 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-02-01 14:16:56 -0500 |
commit | ac779d374ca12099eaeb8e5d89e65df37fd8a8f4 (patch) | |
tree | 181e0b72ead781dcab526124c81096980a552e8b /policyhandler | |
parent | 2322ef8736e839d62930d9b6c847ce818261c26c (diff) |
improved message to deployment-handler and on API
* added errored_scopes and scope_prefixes to the message
to deployment-handler - to prevent erroneous
removal of policies
* hardcoded condition for scope not found 404 at policy-engine
to separate it from error on the scope retrieval 400
* adjusting the web API message in sync with notification
to deployment-handler
* unit test coverage 74%
Change-Id: Ie736a1b7aee0631b6785669c6b765bd240dd77b8
Issue-ID: DCAEGEN2-249
Signed-off-by: Alex Shatov <alexs@att.com>
Diffstat (limited to 'policyhandler')
-rw-r--r-- | policyhandler/deploy_handler.py | 25 | ||||
-rw-r--r-- | policyhandler/policy_consts.py | 10 | ||||
-rw-r--r-- | policyhandler/policy_rest.py | 84 | ||||
-rw-r--r-- | policyhandler/policy_updater.py | 19 | ||||
-rw-r--r-- | policyhandler/web_server.py | 19 |
5 files changed, 82 insertions, 75 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 0dc86b9..306a637 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -1,6 +1,6 @@ # org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -63,33 +63,20 @@ class DeployHandler(object): DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url) @staticmethod - def policy_update(audit, latest_policies, removed_policies=None, - errored_policies=None, catch_up=False): + def policy_update(audit, message): """post policy_updated message to deploy-handler""" - if not latest_policies and not removed_policies and not catch_up: + if not message: return - latest_policies = latest_policies or {} - removed_policies = removed_policies or {} - errored_policies = errored_policies or {} - DeployHandler._lazy_init() - msg = { - "catch_up" : catch_up, - "latest_policies" : latest_policies, - "removed_policies" : removed_policies, - "errored_policies" : errored_policies - } sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity, targetServiceName=DeployHandler._url_path) headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id} - msg_str = json.dumps(msg) + msg_str = json.dumps(message) headers_str = json.dumps(headers) - DeployHandler._logger.info( - "catch_up(%s) latest_policies[%s], removed_policies[%s], errored_policies[%s]", - catch_up, len(latest_policies), len(removed_policies), len(errored_policies)) + DeployHandler._logger.info("message: %s", msg_str) log_line = "post to deployment-handler {0} msg={1} headers={2}".format( DeployHandler._url_path, msg_str, headers_str) @@ -107,7 +94,7 @@ class DeployHandler(object): res = None try: res = DeployHandler._requests_session.post( - DeployHandler._url_path, json=msg, headers=headers + DeployHandler._url_path, json=message, headers=headers ) except requests.exceptions.RequestException as ex: error_msg = "failed to post to deployment-handler {0} {1} msg={2} headers={3}" \ diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index f6a1a9e..bcac080 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -1,6 +1,6 @@ # org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,3 +24,11 @@ POLICY_VERSION = "policyVersion" POLICY_NAME = "policyName" POLICY_BODY = 'policy_body' POLICY_CONFIG = 'config' + +CATCH_UP = "catch_up" +LATEST_POLICIES = "latest_policies" +REMOVED_POLICIES = "removed_policies" +ERRORED_POLICIES = "errored_policies" +ERRORED_SCOPES = "errored_scopes" +SCOPE_PREFIXES = "scope_prefixes" +POLICY_FILTER = "policy_filter" diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 800c564..22ed640 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -19,18 +19,23 @@ """policy-client communicates with policy-engine thru REST API""" -import logging -import json import copy +import json +import logging import time from multiprocessing.dummy import Pool as ThreadPool + import requests from .config import Config -from .policy_consts import POLICY_ID, POLICY_NAME, POLICY_BODY, POLICY_CONFIG -from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode +from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, + AuditResponseCode) +from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, POLICY_BODY, + POLICY_CONFIG, POLICY_FILTER, POLICY_ID, + POLICY_NAME, SCOPE_PREFIXES, LATEST_POLICIES) from .policy_utils import PolicyUtils + class PolicyRest(object): """ policy-engine """ _logger = logging.getLogger("policy_handler.policy_rest") @@ -171,7 +176,7 @@ class PolicyRest(object): return res.status_code, res_data @staticmethod - def validate_policy(policy): + def _validate_policy(policy): """Validates the config on policy""" if not policy: return @@ -185,22 +190,6 @@ class PolicyRest(object): ) @staticmethod - def validate_policies(policies): - """Validate the config on policies. Returns (valid, errored) tuple""" - if not policies: - return None, policies - - valid_policies = {} - errored_policies = {} - for (policy_id, policy) in policies.iteritems(): - if PolicyRest.validate_policy(policy): - valid_policies[policy_id] = policy - else: - errored_policies[policy_id] = policy - - return valid_policies, errored_policies - - @staticmethod def get_latest_policy(aud_policy_id): """Get the latest policy for the policy_id from the policy-engine""" PolicyRest._lazy_init() @@ -260,7 +249,7 @@ class PolicyRest(object): return None audit.set_http_status_code(status_code) - if not PolicyRest.validate_policy(latest_policy): + if not PolicyRest._validate_policy(latest_policy): audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) audit.error( "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), @@ -369,7 +358,7 @@ class PolicyRest(object): get the latest policies by policy_filter or all the latest policies of the same scope from the policy-engine """ - audit, policy_filter, error_if_not_found = aud_policy_filter + audit, policy_filter, scope_prefix = aud_policy_filter str_policy_filter = json.dumps(policy_filter) PolicyRest._logger.debug("%s", str_policy_filter) @@ -379,8 +368,18 @@ class PolicyRest(object): str_policy_filter, json.dumps(policy_configs or [])) latest_policies = PolicyUtils.select_latest_policies(policy_configs) + + if scope_prefix and not policy_configs \ + and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value: + audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR) + ) + return None, latest_policies, scope_prefix + if not latest_policies: - if error_if_not_found: + if not scope_prefix: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) audit.warn( "received no policies from PDP for policy_filter {0}: {1}" @@ -389,28 +388,38 @@ class PolicyRest(object): errorDescription=AuditResponseCode.get_human_text( AuditResponseCode.DATA_ERROR) ) - return None, latest_policies + return None, latest_policies, None audit.set_http_status_code(status_code) - return PolicyRest.validate_policies(latest_policies) + valid_policies = {} + errored_policies = {} + for (policy_id, policy) in latest_policies.iteritems(): + if PolicyRest._validate_policy(policy): + valid_policies[policy_id] = policy + else: + errored_policies[policy_id] = policy + return valid_policies, errored_policies, None @staticmethod def get_latest_policies(audit, policy_filter=None): """Get the latest policies of the same scope from the policy-engine""" PolicyRest._lazy_init() + result = {} aud_policy_filters = None str_metrics = None str_policy_filters = json.dumps(policy_filter or PolicyRest._scope_prefixes) if policy_filter is not None: - aud_policy_filters = [(audit, policy_filter, True)] + aud_policy_filters = [(audit, policy_filter, None)] str_metrics = "get_latest_policies for policy_filter {0}".format( str_policy_filters) + result[POLICY_FILTER] = copy.deepcopy(policy_filter) else: - aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, False) + aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes] str_metrics = "get_latest_policies for scopes {0} {1}".format( \ len(PolicyRest._scope_prefixes), str_policy_filters) + result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes) PolicyRest._logger.debug("%s", str_policy_filters) audit.metrics_start(str_metrics) @@ -429,15 +438,16 @@ class PolicyRest(object): str_metrics, len(latest_policies), json.dumps(latest_policies)), \ targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config) - # latest_policies == [(valid_policies, errored_policies), ...] - valid_policies = dict( - pair for (vps, _) in latest_policies if vps for pair in vps.iteritems()) + # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...] + result[LATEST_POLICIES] = dict( + pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems()) - errored_policies = dict( - pair for (_, eps) in latest_policies if eps for pair in eps.iteritems()) + result[ERRORED_POLICIES] = dict( + pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems()) - PolicyRest._logger.debug( - "got policies for policy_filters: %s. valid_policies: %s errored_policies: %s", - str_policy_filters, json.dumps(valid_policies), json.dumps(errored_policies)) + result[ERRORED_SCOPES] = [esp for (_, _, esp) in latest_policies if esp] + + PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", + str_policy_filters, json.dumps(result)) - return valid_policies, errored_policies + return result diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 0e06f2c..2bce30b 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -1,6 +1,6 @@ # org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,14 +19,15 @@ """policy-updater thread""" -import logging import json +import logging from Queue import Queue -from threading import Thread, Lock +from threading import Lock, Thread -from .policy_rest import PolicyRest from .deploy_handler import DeployHandler from .onap.audit import Audit +from .policy_consts import CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES +from .policy_rest import PolicyRest class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" @@ -72,7 +73,8 @@ class PolicyUpdater(Thread): updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( (audit, policies_updated, policies_removed)) - DeployHandler.policy_update(audit, updated_policies, removed_policies=removed_policies) + message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} + DeployHandler.policy_update(audit, message) audit.audit_done() self._queue.task_done() @@ -114,15 +116,16 @@ class PolicyUpdater(Thread): return False PolicyUpdater._logger.info("catch_up") - latest_policies, errored_policies = PolicyRest.get_latest_policies(aud_catch_up) + + result = PolicyRest.get_latest_policies(aud_catch_up) + result[CATCH_UP] = True if not aud_catch_up.is_success(): PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors") if not audit: self._queue.task_done() else: - DeployHandler.policy_update( - aud_catch_up, latest_policies, errored_policies=errored_policies, catch_up=True) + DeployHandler.policy_update(aud_catch_up, result) self._reset_queue() success, _, _ = aud_catch_up.audit_done() PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 5b3227d..17c06b4 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -79,16 +79,15 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s", req_info) - valid_policies, errored_policies = PolicyRest.get_latest_policies(audit) + result = PolicyRest.get_latest_policies(audit) - res = {"valid_policies": valid_policies, "errored_policies": errored_policies} - PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(res)) + PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result)) - success, http_status_code, _ = audit.audit_done(result=json.dumps(res)) + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) if not success: cherrypy.response.status = http_status_code - return res + return result @cherrypy.expose @cherrypy.tools.json_out() @@ -153,16 +152,16 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \ req_info, str_policy_filter, json.dumps(cherrypy.request.headers)) - res, _ = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} + result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} - PolicyWeb.logger.info("result %s: policy_filter=%s res=%s", \ - req_info, str_policy_filter, json.dumps(res)) + PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", \ + req_info, str_policy_filter, json.dumps(result)) - success, http_status_code, _ = audit.audit_done(result=json.dumps(res)) + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) if not success: cherrypy.response.status = http_status_code - return res + return result @cherrypy.expose @cherrypy.tools.json_out() |