From ac779d374ca12099eaeb8e5d89e65df37fd8a8f4 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Thu, 1 Feb 2018 14:16:56 -0500 Subject: improved message to deployment-handler and on API * added errored_scopes and scope_prefixes to the message to deployment-handler - to prevent erroneous removal of policies * hardcoded condition for scope not found 404 at policy-engine to separate it from error on the scope retrieval 400 * adjusting the web API message in sync with notification to deployment-handler * unit test coverage 74% Change-Id: Ie736a1b7aee0631b6785669c6b765bd240dd77b8 Issue-ID: DCAEGEN2-249 Signed-off-by: Alex Shatov --- LICENSE.txt | 2 +- policyhandler/deploy_handler.py | 25 +++--------- policyhandler/policy_consts.py | 10 ++++- policyhandler/policy_rest.py | 84 +++++++++++++++++++++++------------------ policyhandler/policy_updater.py | 19 ++++++---- policyhandler/web_server.py | 19 +++++----- tests/test_policyhandler.py | 48 +++++++++++++++-------- 7 files changed, 115 insertions(+), 92 deletions(-) diff --git a/LICENSE.txt b/LICENSE.txt index 69d5fc1..28665aa 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,7 +1,7 @@ /* * ============LICENSE_START========================================== * =================================================================== -* Copyright © 2017 AT&T Intellectual Property. All rights reserved. +* Copyright © 2018 AT&T Intellectual Property. All rights reserved. * =================================================================== * * Unless otherwise specified, all software contained herein is licensed diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 0dc86b9..306a637 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -1,6 +1,6 @@ # org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -63,33 +63,20 @@ class DeployHandler(object): DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url) @staticmethod - def policy_update(audit, latest_policies, removed_policies=None, - errored_policies=None, catch_up=False): + def policy_update(audit, message): """post policy_updated message to deploy-handler""" - if not latest_policies and not removed_policies and not catch_up: + if not message: return - latest_policies = latest_policies or {} - removed_policies = removed_policies or {} - errored_policies = errored_policies or {} - DeployHandler._lazy_init() - msg = { - "catch_up" : catch_up, - "latest_policies" : latest_policies, - "removed_policies" : removed_policies, - "errored_policies" : errored_policies - } sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity, targetServiceName=DeployHandler._url_path) headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id} - msg_str = json.dumps(msg) + msg_str = json.dumps(message) headers_str = json.dumps(headers) - DeployHandler._logger.info( - "catch_up(%s) latest_policies[%s], removed_policies[%s], errored_policies[%s]", - catch_up, len(latest_policies), len(removed_policies), len(errored_policies)) + DeployHandler._logger.info("message: %s", msg_str) log_line = "post to deployment-handler {0} msg={1} headers={2}".format( DeployHandler._url_path, msg_str, headers_str) @@ -107,7 +94,7 @@ class DeployHandler(object): res = None try: res = DeployHandler._requests_session.post( - DeployHandler._url_path, json=msg, headers=headers + DeployHandler._url_path, json=message, headers=headers ) except requests.exceptions.RequestException as ex: error_msg = "failed to post to deployment-handler {0} {1} msg={2} headers={3}" \ diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index f6a1a9e..bcac080 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -1,6 +1,6 @@ # org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,3 +24,11 @@ POLICY_VERSION = "policyVersion" POLICY_NAME = "policyName" POLICY_BODY = 'policy_body' POLICY_CONFIG = 'config' + +CATCH_UP = "catch_up" +LATEST_POLICIES = "latest_policies" +REMOVED_POLICIES = "removed_policies" +ERRORED_POLICIES = "errored_policies" +ERRORED_SCOPES = "errored_scopes" +SCOPE_PREFIXES = "scope_prefixes" +POLICY_FILTER = "policy_filter" diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index 800c564..22ed640 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -19,18 +19,23 @@ """policy-client communicates with policy-engine thru REST API""" -import logging -import json import copy +import json +import logging import time from multiprocessing.dummy import Pool as ThreadPool + import requests from .config import Config -from .policy_consts import POLICY_ID, POLICY_NAME, POLICY_BODY, POLICY_CONFIG -from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode +from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, + AuditResponseCode) +from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, POLICY_BODY, + POLICY_CONFIG, POLICY_FILTER, POLICY_ID, + POLICY_NAME, SCOPE_PREFIXES, LATEST_POLICIES) from .policy_utils import PolicyUtils + class PolicyRest(object): """ policy-engine """ _logger = logging.getLogger("policy_handler.policy_rest") @@ -171,7 +176,7 @@ class PolicyRest(object): return res.status_code, res_data @staticmethod - def validate_policy(policy): + def _validate_policy(policy): """Validates the config on policy""" if not policy: return @@ -184,22 +189,6 @@ class PolicyRest(object): and policy_body.get(POLICY_CONFIG) ) - @staticmethod - def validate_policies(policies): - """Validate the config on policies. Returns (valid, errored) tuple""" - if not policies: - return None, policies - - valid_policies = {} - errored_policies = {} - for (policy_id, policy) in policies.iteritems(): - if PolicyRest.validate_policy(policy): - valid_policies[policy_id] = policy - else: - errored_policies[policy_id] = policy - - return valid_policies, errored_policies - @staticmethod def get_latest_policy(aud_policy_id): """Get the latest policy for the policy_id from the policy-engine""" @@ -260,7 +249,7 @@ class PolicyRest(object): return None audit.set_http_status_code(status_code) - if not PolicyRest.validate_policy(latest_policy): + if not PolicyRest._validate_policy(latest_policy): audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) audit.error( "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), @@ -369,7 +358,7 @@ class PolicyRest(object): get the latest policies by policy_filter or all the latest policies of the same scope from the policy-engine """ - audit, policy_filter, error_if_not_found = aud_policy_filter + audit, policy_filter, scope_prefix = aud_policy_filter str_policy_filter = json.dumps(policy_filter) PolicyRest._logger.debug("%s", str_policy_filter) @@ -379,8 +368,18 @@ class PolicyRest(object): str_policy_filter, json.dumps(policy_configs or [])) latest_policies = PolicyUtils.select_latest_policies(policy_configs) + + if scope_prefix and not policy_configs \ + and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value: + audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix), + errorCode=AuditResponseCode.DATA_ERROR.value, + errorDescription=AuditResponseCode.get_human_text( + AuditResponseCode.DATA_ERROR) + ) + return None, latest_policies, scope_prefix + if not latest_policies: - if error_if_not_found: + if not scope_prefix: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) audit.warn( "received no policies from PDP for policy_filter {0}: {1}" @@ -389,28 +388,38 @@ class PolicyRest(object): errorDescription=AuditResponseCode.get_human_text( AuditResponseCode.DATA_ERROR) ) - return None, latest_policies + return None, latest_policies, None audit.set_http_status_code(status_code) - return PolicyRest.validate_policies(latest_policies) + valid_policies = {} + errored_policies = {} + for (policy_id, policy) in latest_policies.iteritems(): + if PolicyRest._validate_policy(policy): + valid_policies[policy_id] = policy + else: + errored_policies[policy_id] = policy + return valid_policies, errored_policies, None @staticmethod def get_latest_policies(audit, policy_filter=None): """Get the latest policies of the same scope from the policy-engine""" PolicyRest._lazy_init() + result = {} aud_policy_filters = None str_metrics = None str_policy_filters = json.dumps(policy_filter or PolicyRest._scope_prefixes) if policy_filter is not None: - aud_policy_filters = [(audit, policy_filter, True)] + aud_policy_filters = [(audit, policy_filter, None)] str_metrics = "get_latest_policies for policy_filter {0}".format( str_policy_filters) + result[POLICY_FILTER] = copy.deepcopy(policy_filter) else: - aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, False) + aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix) for scope_prefix in PolicyRest._scope_prefixes] str_metrics = "get_latest_policies for scopes {0} {1}".format( \ len(PolicyRest._scope_prefixes), str_policy_filters) + result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes) PolicyRest._logger.debug("%s", str_policy_filters) audit.metrics_start(str_metrics) @@ -429,15 +438,16 @@ class PolicyRest(object): str_metrics, len(latest_policies), json.dumps(latest_policies)), \ targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config) - # latest_policies == [(valid_policies, errored_policies), ...] - valid_policies = dict( - pair for (vps, _) in latest_policies if vps for pair in vps.iteritems()) + # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...] + result[LATEST_POLICIES] = dict( + pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems()) - errored_policies = dict( - pair for (_, eps) in latest_policies if eps for pair in eps.iteritems()) + result[ERRORED_POLICIES] = dict( + pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems()) - PolicyRest._logger.debug( - "got policies for policy_filters: %s. valid_policies: %s errored_policies: %s", - str_policy_filters, json.dumps(valid_policies), json.dumps(errored_policies)) + result[ERRORED_SCOPES] = [esp for (_, _, esp) in latest_policies if esp] + + PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", + str_policy_filters, json.dumps(result)) - return valid_policies, errored_policies + return result diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 0e06f2c..2bce30b 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -1,6 +1,6 @@ # org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,14 +19,15 @@ """policy-updater thread""" -import logging import json +import logging from Queue import Queue -from threading import Thread, Lock +from threading import Lock, Thread -from .policy_rest import PolicyRest from .deploy_handler import DeployHandler from .onap.audit import Audit +from .policy_consts import CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES +from .policy_rest import PolicyRest class PolicyUpdater(Thread): """queue and handle the policy-updates in a separate thread""" @@ -72,7 +73,8 @@ class PolicyUpdater(Thread): updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( (audit, policies_updated, policies_removed)) - DeployHandler.policy_update(audit, updated_policies, removed_policies=removed_policies) + message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies} + DeployHandler.policy_update(audit, message) audit.audit_done() self._queue.task_done() @@ -114,15 +116,16 @@ class PolicyUpdater(Thread): return False PolicyUpdater._logger.info("catch_up") - latest_policies, errored_policies = PolicyRest.get_latest_policies(aud_catch_up) + + result = PolicyRest.get_latest_policies(aud_catch_up) + result[CATCH_UP] = True if not aud_catch_up.is_success(): PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors") if not audit: self._queue.task_done() else: - DeployHandler.policy_update( - aud_catch_up, latest_policies, errored_policies=errored_policies, catch_up=True) + DeployHandler.policy_update(aud_catch_up, result) self._reset_queue() success, _, _ = aud_catch_up.audit_done() PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health())) diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 5b3227d..17c06b4 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -79,16 +79,15 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s", req_info) - valid_policies, errored_policies = PolicyRest.get_latest_policies(audit) + result = PolicyRest.get_latest_policies(audit) - res = {"valid_policies": valid_policies, "errored_policies": errored_policies} - PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(res)) + PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result)) - success, http_status_code, _ = audit.audit_done(result=json.dumps(res)) + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) if not success: cherrypy.response.status = http_status_code - return res + return result @cherrypy.expose @cherrypy.tools.json_out() @@ -153,16 +152,16 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \ req_info, str_policy_filter, json.dumps(cherrypy.request.headers)) - res, _ = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} + result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} - PolicyWeb.logger.info("result %s: policy_filter=%s res=%s", \ - req_info, str_policy_filter, json.dumps(res)) + PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", \ + req_info, str_policy_filter, json.dumps(result)) - success, http_status_code, _ = audit.audit_done(result=json.dumps(res)) + success, http_status_code, _ = audit.audit_done(result=json.dumps(result)) if not success: cherrypy.response.status = http_status_code - return res + return result @cherrypy.expose @cherrypy.tools.json_out() diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py index 2530dd3..aad965a 100644 --- a/tests/test_policyhandler.py +++ b/tests/test_policyhandler.py @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= # org.onap.dcae # ================================================================================ -# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import uuid from datetime import datetime import pytest - import cherrypy from cherrypy.test.helper import CPWebCase @@ -39,8 +38,10 @@ from policyhandler.deploy_handler import DeployHandler from policyhandler.discovery import DiscoveryClient from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode) -from policyhandler.policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, - POLICY_NAME, POLICY_VERSION) +from policyhandler.policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, + LATEST_POLICIES, POLICY_BODY, + POLICY_CONFIG, POLICY_ID, POLICY_NAME, + POLICY_VERSION, SCOPE_PREFIXES) from policyhandler.policy_handler import LogWriter from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER, REMOVED_POLICIES, PolicyReceiver) @@ -162,6 +163,11 @@ class MonkeyPolicyBody(object): val_1 = policy_body_1[key] val_2 = policy_body_2[key] + if isinstance(val_1, list) and isinstance(val_2, list): + if sorted(val_1) != sorted(val_1): + return False + continue + if isinstance(val_1, dict) \ and not MonkeyPolicyBody.is_the_same_dict(val_1, val_2): return False @@ -216,19 +222,27 @@ class MonkeyPolicyEngine(object): @staticmethod def gen_all_policies_latest(): """generate all latest policies""" - return dict( - MonkeyPolicyEngine.gen_policy_latest(policy_index) - for policy_index in range(len(MonkeyPolicyEngine.LOREM_IPSUM)) - ) + return { + LATEST_POLICIES: dict(MonkeyPolicyEngine.gen_policy_latest(policy_index) + for policy_index in range(len(MonkeyPolicyEngine.LOREM_IPSUM))), + ERRORED_SCOPES: ["DCAE.Config_*"], + SCOPE_PREFIXES: ["DCAE.Config_*"], + ERRORED_POLICIES: {} + } @staticmethod def gen_policies_latest(match_to_policy_name): """generate all latest policies""" - return dict( - (k, v) - for k, v in MonkeyPolicyEngine.gen_all_policies_latest().iteritems() - if re.match(match_to_policy_name, k) - ) + return { + LATEST_POLICIES: + dict((k, v) + for k, v in MonkeyPolicyEngine.gen_all_policies_latest() + [LATEST_POLICIES].iteritems() + if re.match(match_to_policy_name, k)), + ERRORED_SCOPES: [], + ERRORED_POLICIES: {} + } + MonkeyPolicyEngine.init() @@ -415,6 +429,7 @@ class WebServerTest(CPWebCase): def test_web_all_policies_latest(self): """test GET /policies_latest""" expected_policies = MonkeyPolicyEngine.gen_all_policies_latest() + expected_policies = expected_policies[LATEST_POLICIES] result = self.getPage("/policies_latest") Settings.logger.info("result: %s", result) @@ -422,8 +437,8 @@ class WebServerTest(CPWebCase): self.assertStatus('200 OK') policies_latest = json.loads(self.body) - self.assertIn("valid_policies", policies_latest) - policies_latest = policies_latest["valid_policies"] + self.assertIn(LATEST_POLICIES, policies_latest) + policies_latest = policies_latest[LATEST_POLICIES] Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) @@ -434,6 +449,7 @@ class WebServerTest(CPWebCase): """test POST /policies_latest with policyName""" match_to_policy_name = Config.config["scope_prefixes"][0] + "amet.*" expected_policies = MonkeyPolicyEngine.gen_policies_latest(match_to_policy_name) + expected_policies = expected_policies[LATEST_POLICIES] body = json.dumps({POLICY_NAME: match_to_policy_name}) result = self.getPage("/policies_latest", method='POST', @@ -447,7 +463,7 @@ class WebServerTest(CPWebCase): Settings.logger.info("body: %s", self.body) self.assertStatus('200 OK') - policies_latest = json.loads(self.body) + policies_latest = json.loads(self.body)[LATEST_POLICIES] Settings.logger.info("policies_latest: %s", json.dumps(policies_latest)) Settings.logger.info("expected_policies: %s", json.dumps(expected_policies)) -- cgit 1.2.3-korg