aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-02-01 14:16:56 -0500
committerAlex Shatov <alexs@att.com>2018-02-01 14:16:56 -0500
commitac779d374ca12099eaeb8e5d89e65df37fd8a8f4 (patch)
tree181e0b72ead781dcab526124c81096980a552e8b
parent2322ef8736e839d62930d9b6c847ce818261c26c (diff)
improved message to deployment-handler and on API
* added errored_scopes and scope_prefixes to the message to deployment-handler - to prevent erroneous removal of policies * hardcoded condition for scope not found 404 at policy-engine to separate it from error on the scope retrieval 400 * adjusting the web API message in sync with notification to deployment-handler * unit test coverage 74% Change-Id: Ie736a1b7aee0631b6785669c6b765bd240dd77b8 Issue-ID: DCAEGEN2-249 Signed-off-by: Alex Shatov <alexs@att.com>
-rw-r--r--LICENSE.txt2
-rw-r--r--policyhandler/deploy_handler.py25
-rw-r--r--policyhandler/policy_consts.py10
-rw-r--r--policyhandler/policy_rest.py84
-rw-r--r--policyhandler/policy_updater.py19
-rw-r--r--policyhandler/web_server.py19
-rw-r--r--tests/test_policyhandler.py48
7 files changed, 115 insertions, 92 deletions
diff --git a/LICENSE.txt b/LICENSE.txt
index 69d5fc1..28665aa 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -1,7 +1,7 @@
/*
* ============LICENSE_START==========================================
* ===================================================================
-* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+* Copyright © 2018 AT&T Intellectual Property. All rights reserved.
* ===================================================================
*
* Unless otherwise specified, all software contained herein is licensed
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 0dc86b9..306a637 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -1,6 +1,6 @@
# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -63,33 +63,20 @@ class DeployHandler(object):
DeployHandler._logger.info("DeployHandler url(%s)", DeployHandler._url)
@staticmethod
- def policy_update(audit, latest_policies, removed_policies=None,
- errored_policies=None, catch_up=False):
+ def policy_update(audit, message):
"""post policy_updated message to deploy-handler"""
- if not latest_policies and not removed_policies and not catch_up:
+ if not message:
return
- latest_policies = latest_policies or {}
- removed_policies = removed_policies or {}
- errored_policies = errored_policies or {}
-
DeployHandler._lazy_init()
- msg = {
- "catch_up" : catch_up,
- "latest_policies" : latest_policies,
- "removed_policies" : removed_policies,
- "errored_policies" : errored_policies
- }
sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity,
targetServiceName=DeployHandler._url_path)
headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id}
- msg_str = json.dumps(msg)
+ msg_str = json.dumps(message)
headers_str = json.dumps(headers)
- DeployHandler._logger.info(
- "catch_up(%s) latest_policies[%s], removed_policies[%s], errored_policies[%s]",
- catch_up, len(latest_policies), len(removed_policies), len(errored_policies))
+ DeployHandler._logger.info("message: %s", msg_str)
log_line = "post to deployment-handler {0} msg={1} headers={2}".format(
DeployHandler._url_path, msg_str, headers_str)
@@ -107,7 +94,7 @@ class DeployHandler(object):
res = None
try:
res = DeployHandler._requests_session.post(
- DeployHandler._url_path, json=msg, headers=headers
+ DeployHandler._url_path, json=message, headers=headers
)
except requests.exceptions.RequestException as ex:
error_msg = "failed to post to deployment-handler {0} {1} msg={2} headers={3}" \
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index f6a1a9e..bcac080 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -1,6 +1,6 @@
# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,3 +24,11 @@ POLICY_VERSION = "policyVersion"
POLICY_NAME = "policyName"
POLICY_BODY = 'policy_body'
POLICY_CONFIG = 'config'
+
+CATCH_UP = "catch_up"
+LATEST_POLICIES = "latest_policies"
+REMOVED_POLICIES = "removed_policies"
+ERRORED_POLICIES = "errored_policies"
+ERRORED_SCOPES = "errored_scopes"
+SCOPE_PREFIXES = "scope_prefixes"
+POLICY_FILTER = "policy_filter"
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 800c564..22ed640 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -19,18 +19,23 @@
"""policy-client communicates with policy-engine thru REST API"""
-import logging
-import json
import copy
+import json
+import logging
import time
from multiprocessing.dummy import Pool as ThreadPool
+
import requests
from .config import Config
-from .policy_consts import POLICY_ID, POLICY_NAME, POLICY_BODY, POLICY_CONFIG
-from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, AuditResponseCode
+from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode,
+ AuditResponseCode)
+from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, POLICY_BODY,
+ POLICY_CONFIG, POLICY_FILTER, POLICY_ID,
+ POLICY_NAME, SCOPE_PREFIXES, LATEST_POLICIES)
from .policy_utils import PolicyUtils
+
class PolicyRest(object):
""" policy-engine """
_logger = logging.getLogger("policy_handler.policy_rest")
@@ -171,7 +176,7 @@ class PolicyRest(object):
return res.status_code, res_data
@staticmethod
- def validate_policy(policy):
+ def _validate_policy(policy):
"""Validates the config on policy"""
if not policy:
return
@@ -185,22 +190,6 @@ class PolicyRest(object):
)
@staticmethod
- def validate_policies(policies):
- """Validate the config on policies. Returns (valid, errored) tuple"""
- if not policies:
- return None, policies
-
- valid_policies = {}
- errored_policies = {}
- for (policy_id, policy) in policies.iteritems():
- if PolicyRest.validate_policy(policy):
- valid_policies[policy_id] = policy
- else:
- errored_policies[policy_id] = policy
-
- return valid_policies, errored_policies
-
- @staticmethod
def get_latest_policy(aud_policy_id):
"""Get the latest policy for the policy_id from the policy-engine"""
PolicyRest._lazy_init()
@@ -260,7 +249,7 @@ class PolicyRest(object):
return None
audit.set_http_status_code(status_code)
- if not PolicyRest.validate_policy(latest_policy):
+ if not PolicyRest._validate_policy(latest_policy):
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.error(
"received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
@@ -369,7 +358,7 @@ class PolicyRest(object):
get the latest policies by policy_filter
or all the latest policies of the same scope from the policy-engine
"""
- audit, policy_filter, error_if_not_found = aud_policy_filter
+ audit, policy_filter, scope_prefix = aud_policy_filter
str_policy_filter = json.dumps(policy_filter)
PolicyRest._logger.debug("%s", str_policy_filter)
@@ -379,8 +368,18 @@ class PolicyRest(object):
str_policy_filter, json.dumps(policy_configs or []))
latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+
+ if scope_prefix and not policy_configs \
+ and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value:
+ audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix),
+ errorCode=AuditResponseCode.DATA_ERROR.value,
+ errorDescription=AuditResponseCode.get_human_text(
+ AuditResponseCode.DATA_ERROR)
+ )
+ return None, latest_policies, scope_prefix
+
if not latest_policies:
- if error_if_not_found:
+ if not scope_prefix:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.warn(
"received no policies from PDP for policy_filter {0}: {1}"
@@ -389,28 +388,38 @@ class PolicyRest(object):
errorDescription=AuditResponseCode.get_human_text(
AuditResponseCode.DATA_ERROR)
)
- return None, latest_policies
+ return None, latest_policies, None
audit.set_http_status_code(status_code)
- return PolicyRest.validate_policies(latest_policies)
+ valid_policies = {}
+ errored_policies = {}
+ for (policy_id, policy) in latest_policies.iteritems():
+ if PolicyRest._validate_policy(policy):
+ valid_policies[policy_id] = policy
+ else:
+ errored_policies[policy_id] = policy
+ return valid_policies, errored_policies, None
@staticmethod
def get_latest_policies(audit, policy_filter=None):
"""Get the latest policies of the same scope from the policy-engine"""
PolicyRest._lazy_init()
+ result = {}
aud_policy_filters = None
str_metrics = None
str_policy_filters = json.dumps(policy_filter or PolicyRest._scope_prefixes)
if policy_filter is not None:
- aud_policy_filters = [(audit, policy_filter, True)]
+ aud_policy_filters = [(audit, policy_filter, None)]
str_metrics = "get_latest_policies for policy_filter {0}".format(
str_policy_filters)
+ result[POLICY_FILTER] = copy.deepcopy(policy_filter)
else:
- aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, False)
+ aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix)
for scope_prefix in PolicyRest._scope_prefixes]
str_metrics = "get_latest_policies for scopes {0} {1}".format( \
len(PolicyRest._scope_prefixes), str_policy_filters)
+ result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes)
PolicyRest._logger.debug("%s", str_policy_filters)
audit.metrics_start(str_metrics)
@@ -429,15 +438,16 @@ class PolicyRest(object):
str_metrics, len(latest_policies), json.dumps(latest_policies)), \
targetEntity=PolicyRest._target_entity, targetServiceName=PolicyRest._url_get_config)
- # latest_policies == [(valid_policies, errored_policies), ...]
- valid_policies = dict(
- pair for (vps, _) in latest_policies if vps for pair in vps.iteritems())
+ # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...]
+ result[LATEST_POLICIES] = dict(
+ pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems())
- errored_policies = dict(
- pair for (_, eps) in latest_policies if eps for pair in eps.iteritems())
+ result[ERRORED_POLICIES] = dict(
+ pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems())
- PolicyRest._logger.debug(
- "got policies for policy_filters: %s. valid_policies: %s errored_policies: %s",
- str_policy_filters, json.dumps(valid_policies), json.dumps(errored_policies))
+ result[ERRORED_SCOPES] = [esp for (_, _, esp) in latest_policies if esp]
+
+ PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
+ str_policy_filters, json.dumps(result))
- return valid_policies, errored_policies
+ return result
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 0e06f2c..2bce30b 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -1,6 +1,6 @@
# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,14 +19,15 @@
"""policy-updater thread"""
-import logging
import json
+import logging
from Queue import Queue
-from threading import Thread, Lock
+from threading import Lock, Thread
-from .policy_rest import PolicyRest
from .deploy_handler import DeployHandler
from .onap.audit import Audit
+from .policy_consts import CATCH_UP, LATEST_POLICIES, REMOVED_POLICIES
+from .policy_rest import PolicyRest
class PolicyUpdater(Thread):
"""queue and handle the policy-updates in a separate thread"""
@@ -72,7 +73,8 @@ class PolicyUpdater(Thread):
updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
(audit, policies_updated, policies_removed))
- DeployHandler.policy_update(audit, updated_policies, removed_policies=removed_policies)
+ message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
+ DeployHandler.policy_update(audit, message)
audit.audit_done()
self._queue.task_done()
@@ -114,15 +116,16 @@ class PolicyUpdater(Thread):
return False
PolicyUpdater._logger.info("catch_up")
- latest_policies, errored_policies = PolicyRest.get_latest_policies(aud_catch_up)
+
+ result = PolicyRest.get_latest_policies(aud_catch_up)
+ result[CATCH_UP] = True
if not aud_catch_up.is_success():
PolicyUpdater._logger.warn("not sending catch-up to deployment-handler due to errors")
if not audit:
self._queue.task_done()
else:
- DeployHandler.policy_update(
- aud_catch_up, latest_policies, errored_policies=errored_policies, catch_up=True)
+ DeployHandler.policy_update(aud_catch_up, result)
self._reset_queue()
success, _, _ = aud_catch_up.audit_done()
PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index 5b3227d..17c06b4 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -79,16 +79,15 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s", req_info)
- valid_policies, errored_policies = PolicyRest.get_latest_policies(audit)
+ result = PolicyRest.get_latest_policies(audit)
- res = {"valid_policies": valid_policies, "errored_policies": errored_policies}
- PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(res))
+ PolicyWeb.logger.info("result %s: %s", req_info, json.dumps(result))
- success, http_status_code, _ = audit.audit_done(result=json.dumps(res))
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
if not success:
cherrypy.response.status = http_status_code
- return res
+ return result
@cherrypy.expose
@cherrypy.tools.json_out()
@@ -153,16 +152,16 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", \
req_info, str_policy_filter, json.dumps(cherrypy.request.headers))
- res, _ = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {}
+ result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {}
- PolicyWeb.logger.info("result %s: policy_filter=%s res=%s", \
- req_info, str_policy_filter, json.dumps(res))
+ PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", \
+ req_info, str_policy_filter, json.dumps(result))
- success, http_status_code, _ = audit.audit_done(result=json.dumps(res))
+ success, http_status_code, _ = audit.audit_done(result=json.dumps(result))
if not success:
cherrypy.response.status = http_status_code
- return res
+ return result
@cherrypy.expose
@cherrypy.tools.json_out()
diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py
index 2530dd3..aad965a 100644
--- a/tests/test_policyhandler.py
+++ b/tests/test_policyhandler.py
@@ -1,7 +1,7 @@
# ============LICENSE_START=======================================================
# org.onap.dcae
# ================================================================================
-# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,7 +30,6 @@ import uuid
from datetime import datetime
import pytest
-
import cherrypy
from cherrypy.test.helper import CPWebCase
@@ -39,8 +38,10 @@ from policyhandler.deploy_handler import DeployHandler
from policyhandler.discovery import DiscoveryClient
from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit,
AuditHttpCode)
-from policyhandler.policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID,
- POLICY_NAME, POLICY_VERSION)
+from policyhandler.policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES,
+ LATEST_POLICIES, POLICY_BODY,
+ POLICY_CONFIG, POLICY_ID, POLICY_NAME,
+ POLICY_VERSION, SCOPE_PREFIXES)
from policyhandler.policy_handler import LogWriter
from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER,
REMOVED_POLICIES, PolicyReceiver)
@@ -162,6 +163,11 @@ class MonkeyPolicyBody(object):
val_1 = policy_body_1[key]
val_2 = policy_body_2[key]
+ if isinstance(val_1, list) and isinstance(val_2, list):
+ if sorted(val_1) != sorted(val_1):
+ return False
+ continue
+
if isinstance(val_1, dict) \
and not MonkeyPolicyBody.is_the_same_dict(val_1, val_2):
return False
@@ -216,19 +222,27 @@ class MonkeyPolicyEngine(object):
@staticmethod
def gen_all_policies_latest():
"""generate all latest policies"""
- return dict(
- MonkeyPolicyEngine.gen_policy_latest(policy_index)
- for policy_index in range(len(MonkeyPolicyEngine.LOREM_IPSUM))
- )
+ return {
+ LATEST_POLICIES: dict(MonkeyPolicyEngine.gen_policy_latest(policy_index)
+ for policy_index in range(len(MonkeyPolicyEngine.LOREM_IPSUM))),
+ ERRORED_SCOPES: ["DCAE.Config_*"],
+ SCOPE_PREFIXES: ["DCAE.Config_*"],
+ ERRORED_POLICIES: {}
+ }
@staticmethod
def gen_policies_latest(match_to_policy_name):
"""generate all latest policies"""
- return dict(
- (k, v)
- for k, v in MonkeyPolicyEngine.gen_all_policies_latest().iteritems()
- if re.match(match_to_policy_name, k)
- )
+ return {
+ LATEST_POLICIES:
+ dict((k, v)
+ for k, v in MonkeyPolicyEngine.gen_all_policies_latest()
+ [LATEST_POLICIES].iteritems()
+ if re.match(match_to_policy_name, k)),
+ ERRORED_SCOPES: [],
+ ERRORED_POLICIES: {}
+ }
+
MonkeyPolicyEngine.init()
@@ -415,6 +429,7 @@ class WebServerTest(CPWebCase):
def test_web_all_policies_latest(self):
"""test GET /policies_latest"""
expected_policies = MonkeyPolicyEngine.gen_all_policies_latest()
+ expected_policies = expected_policies[LATEST_POLICIES]
result = self.getPage("/policies_latest")
Settings.logger.info("result: %s", result)
@@ -422,8 +437,8 @@ class WebServerTest(CPWebCase):
self.assertStatus('200 OK')
policies_latest = json.loads(self.body)
- self.assertIn("valid_policies", policies_latest)
- policies_latest = policies_latest["valid_policies"]
+ self.assertIn(LATEST_POLICIES, policies_latest)
+ policies_latest = policies_latest[LATEST_POLICIES]
Settings.logger.info("policies_latest: %s", json.dumps(policies_latest))
Settings.logger.info("expected_policies: %s", json.dumps(expected_policies))
@@ -434,6 +449,7 @@ class WebServerTest(CPWebCase):
"""test POST /policies_latest with policyName"""
match_to_policy_name = Config.config["scope_prefixes"][0] + "amet.*"
expected_policies = MonkeyPolicyEngine.gen_policies_latest(match_to_policy_name)
+ expected_policies = expected_policies[LATEST_POLICIES]
body = json.dumps({POLICY_NAME: match_to_policy_name})
result = self.getPage("/policies_latest", method='POST',
@@ -447,7 +463,7 @@ class WebServerTest(CPWebCase):
Settings.logger.info("body: %s", self.body)
self.assertStatus('200 OK')
- policies_latest = json.loads(self.body)
+ policies_latest = json.loads(self.body)[LATEST_POLICIES]
Settings.logger.info("policies_latest: %s", json.dumps(policies_latest))
Settings.logger.info("expected_policies: %s", json.dumps(expected_policies))