aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2020-02-27 12:45:54 -0500
committerAlex Shatov <alexs@att.com>2020-02-27 12:45:54 -0500
commit78ff88f9b3a3d32f941b3b9fedc2abfbaba291cb (patch)
tree5670dddc0e0cd9f793d419420b61ad0559639497 /policyhandler
parent715fc8a36ac1809cd3e36cbb6cfb7107ebb038ea (diff)
5.1.0 policy-handler - policy-updates from new PDP5.1.0
DCAEGEN2-1851: - policy-handler now supports the policy-update notification from the new policy-engine thru DMaaP MR = no policy-filters - only policy-id values - see README for discoverable config settings of dmaap_mr client = DMaaP MR client has the same flexibility as policy_engine = set the query.timeout to high value like 15000 (default) - requests to DMaaP MR go through a single blocking connection - first catch-up only after draining the policy-updates from DMaaP MR on the first loop - safe parsing of messages from DMaaP MR - policy-engine changed the data type for policy-version field from int to string that is expected to have the semver value - related change to deployment-handler (DCAEGEN2-2085) has to be deployed to handle the non-numeric policyVersion - on new PDP API: http /policy_latest and policy-updates return the new data from the new PDP API with the following fields added/renamed by the policy-handler to keep other policy related parts intact in R4-R6 (see pdp_api/policy_utils.py) * policyName = policy_id + "." + policyVersion.replace(".","-") + ".xml" * policyVersion = str(metadata["policy-version"]) * "config" - is the renamed "properties" from the new PDP API response - enabled the /catch_up and the periodic auto-catch-up for the new PDP API - enabled GET /policies_latest - returns the latest policies for the deployed components - POST /policies_latest - still disabled since no support for the policy-filters is provided for the new PDP API - fixed hiding the Authorization value on comparing the configs - logging of secrets is now sha256 to see whether they changed - added X-ONAP-RequestID to headers the same way as X-ECOMP-RequestID - on policy-update process the removal first, then addition - changed the pool_connections=1 (number of pools) on PDP and DH sides == only a single destination is expected for each - log the exception as fatal into error.log - other minor fixes and refactoring - unit-test coverage 74% - integration testing is requested DCAEGEN2-1976: - policy-handler is enhanced to get user/password from env vars for PDP and DMaaP MR clients and overwriting the Authorization field in https headers received from the discoverable config = to override the Authorization value on policy_engine, set the environment vars $PDP_USER and $PDP_PWD in policy-handler container = to override the Authorization value on dmaap_mr, if using https and user-password authentication, set the environment vars $DMAAP_MR_USER and $DMAAP_MR_PWD in policy-handler container Change-Id: Iad8eab9e20e615a0e0d2822f4735dc64c50aa55c Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-1851 Issue-ID: DCAEGEN2-1976
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/config.py56
-rw-r--r--policyhandler/deploy_handler.py28
-rw-r--r--policyhandler/discovery.py6
-rw-r--r--policyhandler/onap/audit.py51
-rw-r--r--policyhandler/pdp_api/dmaap_mr.py202
-rw-r--r--policyhandler/pdp_api/pdp_consts.py6
-rw-r--r--policyhandler/pdp_api/policy_listener.py176
-rw-r--r--policyhandler/pdp_api/policy_matcher.py93
-rw-r--r--policyhandler/pdp_api/policy_rest.py331
-rw-r--r--policyhandler/pdp_api/policy_updates.py87
-rw-r--r--policyhandler/pdp_api/policy_utils.py32
-rw-r--r--policyhandler/pdp_api_v0/policy_listener.py8
-rw-r--r--policyhandler/pdp_api_v0/policy_matcher.py22
-rw-r--r--policyhandler/pdp_api_v0/policy_rest.py17
-rw-r--r--policyhandler/pdp_api_v0/policy_updates.py38
-rw-r--r--policyhandler/pdp_api_v0/policy_utils.py8
-rw-r--r--policyhandler/policy_receiver.py15
-rw-r--r--policyhandler/policy_updater.py33
-rw-r--r--policyhandler/service_activator.py14
-rw-r--r--policyhandler/utils.py15
-rw-r--r--policyhandler/web_server.py14
21 files changed, 1029 insertions, 223 deletions
diff --git a/policyhandler/config.py b/policyhandler/config.py
index f8c425a..25ae3a5 100644
--- a/policyhandler/config.py
+++ b/policyhandler/config.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
"""read and use the config"""
+import base64
import copy
import json
import logging
@@ -137,6 +138,7 @@ class Config(object):
FIELD_WSERVICE_PORT = "wservice_port"
FIELD_TLS = "tls"
FIELD_POLICY_ENGINE = "policy_engine"
+ DMAAP_MR = "dmaap_mr"
POOL_CONNECTIONS = "pool_connections"
DEPLOY_HANDLER = "deploy_handler"
THREAD_POOL_SIZE = "thread_pool_size"
@@ -155,6 +157,11 @@ class Config(object):
SERVICE_ACTIVATOR = "service_activator"
MODE_OF_OPERATION = "mode_of_operation"
PDP_API_VERSION = "PDP_API_VERSION"
+ QUERY_TIMEOUT = "timeout"
+ PDP_USER = "PDP_USER"
+ PDP_PWD = "PDP_PWD"
+ DMAAP_MR_USER = "DMAAP_MR_USER"
+ DMAAP_MR_PWD = "DMAAP_MR_PWD"
system_name = SERVICE_NAME_POLICY_HANDLER
wservice_port = 25577
@@ -165,6 +172,8 @@ class Config(object):
tls_server_cert_file = None
tls_private_key_file = None
tls_server_ca_chain_file = None
+ _pdp_authorization = None
+ _dmaap_mr_authorization = None
_local_config = Settings()
discovered_config = Settings()
@@ -254,6 +263,18 @@ class Config(object):
Config._pdp_api_version = os.environ.get(
Config.PDP_API_VERSION, loaded_config.get(Config.PDP_API_VERSION.lower()))
+ pdp_user = os.environ.get(Config.PDP_USER)
+ pdp_pwd = os.environ.get(Config.PDP_PWD)
+ if pdp_user and pdp_pwd:
+ Config._pdp_authorization = "Basic {}".format(base64.b64encode(
+ ("{}:{}".format(pdp_user, pdp_pwd)).encode()).decode("utf-8"))
+
+ dmaap_mr_user = os.environ.get(Config.DMAAP_MR_USER)
+ dmaap_mr_pwd = os.environ.get(Config.DMAAP_MR_PWD)
+ if dmaap_mr_user and dmaap_mr_pwd:
+ Config._dmaap_mr_authorization = "Basic {}".format(base64.b64encode(
+ ("{}:{}".format(dmaap_mr_user, dmaap_mr_pwd)).encode()).decode("utf-8"))
+
local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {})
Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name)
@@ -262,6 +283,26 @@ class Config(object):
Config._local_config.set_config(local_config, auto_commit=True)
@staticmethod
+ def _overwrite_discovered_config(audit, discovered_config):
+ """replace the secrets in discovered_config with data from environment"""
+ changes = []
+ if Config._pdp_authorization:
+ pdp_cfg = discovered_config.get("policy_engine", {})
+ if pdp_cfg.get("url", "").lower().startswith("https:"):
+ pdp_cfg.get("headers", {})["Authorization"] = Config._pdp_authorization
+ changes.append("pdp_authorization")
+
+ if Config._dmaap_mr_authorization:
+ dmaap_mr_cfg = discovered_config.get("dmaap_mr", {})
+ if dmaap_mr_cfg.get("url", "").lower().startswith("https:"):
+ dmaap_mr_cfg.get("headers", {})["Authorization"] = Config._dmaap_mr_authorization
+ changes.append("dmaap_mr_authorization")
+
+ if changes:
+ _LOGGER.info(audit.info("overwritten discovered config: {}".format(", ".join(changes))))
+
+
+ @staticmethod
def discover(audit):
"""bring the config settings from the discovery service"""
discovery_key = Config.system_name
@@ -269,14 +310,17 @@ class Config(object):
new_config = DiscoveryClient.get_value(audit, discovery_key)
if not new_config or not isinstance(new_config, dict):
- _LOGGER.warning("unexpected config from discovery: %s", new_config)
+ _LOGGER.warning(audit.warn("unexpected config from discovery: {}".format(new_config)))
return
- _LOGGER.debug("loaded config from discovery(%s): %s",
- discovery_key, Audit.json_dumps(new_config))
+ _LOGGER.debug(audit.debug("loaded config from discovery({}): {}".format(
+ discovery_key, Audit.json_dumps(new_config))))
+ discovered_config = new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)
+
+ Config._overwrite_discovered_config(audit, discovered_config)
- Config.discovered_config.set_config(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
- _LOGGER.info("config from discovery: %s", Config.discovered_config)
+ Config.discovered_config.set_config(discovered_config)
+ _LOGGER.info(audit.info("config from discovery: {}".format(Config.discovered_config)))
@staticmethod
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index a127e54..997ec3e 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,8 +25,7 @@ import requests
from .config import Config, Settings
from .discovery import DiscoveryClient
-from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
- AuditResponseCode, Metrics)
+from .onap.audit import AuditHttpCode, AuditResponseCode, Metrics
from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES,
POLICY_FILTER_MATCHES, POLICY_FILTERS,
REMOVED_POLICIES, TARGET_ENTITY)
@@ -172,10 +171,10 @@ class DeployHandler(object):
changed, pool_size = DeployHandler._settings.get_by_key(Config.POOL_CONNECTIONS, 10)
if changed:
DeployHandler._requests_session.mount(
- 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=1,
pool_maxsize=pool_size))
DeployHandler._requests_session.mount(
- 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=1,
pool_maxsize=pool_size))
_, config_dh = DeployHandler._settings.get_by_key(Config.DEPLOY_HANDLER)
@@ -301,7 +300,8 @@ class DeployHandler(object):
metrics = Metrics(aud_parent=audit, targetEntity="{} policy_update".format(target_entity),
targetServiceName=url)
- headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
+
+ headers = metrics.put_request_id_into_headers()
log_action = "put to {} at {}".format(target_entity, url)
log_data = "msg={} headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format(
@@ -330,7 +330,7 @@ class DeployHandler(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = "failed to {} {}: {} {}".format(
log_action, type(ex).__name__, str(ex), log_data)
- _LOGGER.exception(error_msg)
+ _LOGGER.exception(metrics.fatal(error_msg))
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
@@ -371,7 +371,7 @@ class DeployHandler(object):
metrics = Metrics(aud_parent=audit,
targetEntity="{} get_deployed_policies".format(target_entity),
targetServiceName=url)
- headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
+ headers = metrics.put_request_id_into_headers()
log_action = "get from {} at {}".format(target_entity, url)
log_data = "headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format(
@@ -387,7 +387,7 @@ class DeployHandler(object):
metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
metrics.metrics(error_msg)
- return None, None
+ return {"error": "failed to retrieve policies from deployment-handler"}, None, None
res = None
try:
@@ -399,11 +399,11 @@ class DeployHandler(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = "failed to {} {}: {} {}".format(
log_action, type(ex).__name__, str(ex), log_data)
- _LOGGER.exception(error_msg)
+ _LOGGER.exception(metrics.fatal(error_msg))
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
- return None, None
+ return {"error": "failed to retrieve policies from deployment-handler"}, None, None
metrics.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
@@ -414,7 +414,7 @@ class DeployHandler(object):
if res.status_code != requests.codes.ok:
_LOGGER.error(log_line)
- return None, None
+ return {"error": "failed to retrieve policies from deployment-handler"}, None, None
result = res.json() or {}
DeployHandler._server_instance_changed(result, metrics)
@@ -426,10 +426,10 @@ class DeployHandler(object):
_LOGGER.warning(audit.warn(
"found no deployed policies or policy-filters: {}".format(log_line),
error_code=AuditResponseCode.DATA_ERROR))
- return policies, policy_filters
+ return {"warning": "got no deployed policies"}, None, None
_LOGGER.info(log_line)
- return policies, policy_filters
+ return None, policies, policy_filters
@staticmethod
def _server_instance_changed(result, metrics):
diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py
index 83f54ac..b6b6bbd 100644
--- a/policyhandler/discovery.py
+++ b/policyhandler/discovery.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -85,7 +85,7 @@ class DiscoveryClient(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line,
type(ex).__name__, str(ex)))
- _LOGGER.exception(error_msg)
+ _LOGGER.exception(metrics.fatal(error_msg))
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
@@ -136,7 +136,7 @@ class DiscoveryClient(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line,
type(ex).__name__, str(ex)))
- _LOGGER.exception(error_msg)
+ _LOGGER.exception(metrics.fatal(error_msg))
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index 3c09c16..269dfd8 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
"""
import copy
+import hashlib
import json
import os
import re
@@ -41,6 +42,7 @@ from .health import Health
from .process_info import ProcessInfo
REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID"
+REQUEST_X_ONAP_REQUESTID = "X-ONAP-RequestID"
REQUEST_REMOTE_ADDR = "Remote-Addr"
REQUEST_HOST = "Host"
HOSTNAME = "HOSTNAME"
@@ -118,7 +120,7 @@ class AuditResponseCode(Enum):
class _Audit(object):
"""put the audit object on stack per each initiating request in the system
- :request_id: is the X-ECOMP-RequestID for tracing
+ :request_id: is the X-ONAP-RequestID or X-ECOMP-RequestID for tracing
:req_message: is the request message string for logging
@@ -172,7 +174,7 @@ class _Audit(object):
"""create audit object per each request in the system
:job_name: is the name of the audit job for health stats
- :request_id: is the X-ECOMP-RequestID for tracing
+ :request_id: is the X-ONAP-RequestID or X-ECOMP-RequestID for tracing
:req_message: is the request message string for logging
:kwargs: - put any request related params into kwargs
"""
@@ -184,6 +186,12 @@ class _Audit(object):
self.max_http_status_code = 0
self._lock = threading.Lock()
+ def put_request_id_into_headers(self, headers=None):
+ """when sending message out - put the request_id into headers"""
+ headers = headers or {}
+ headers[REQUEST_X_ONAP_REQUESTID] = self.request_id
+ headers[REQUEST_X_ECOMP_REQUESTID] = self.request_id
+ return headers
@staticmethod
def register_item_health(health_name, health_getter=None):
@@ -241,6 +249,8 @@ class _Audit(object):
def set_http_status_code(self, http_status_code):
"""accumulate the highest(worst) http status code"""
+ if http_status_code is None:
+ http_status_code = AuditHttpCode.SERVER_INTERNAL_ERROR.value
with self._lock:
if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
self.max_http_status_code = max(http_status_code, self.max_http_status_code)
@@ -308,9 +318,7 @@ class _Audit(object):
"""debug+error - the warn level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- if error_code and isinstance(error_code, AuditResponseCode):
- all_kwargs[ERROR_CODE] = error_code.value
- all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+ self._set_error_code_in_kwargs(error_code, all_kwargs)
_Audit._logger_debug.warn(log_line, **all_kwargs)
_Audit._logger_error.warn(log_line, **all_kwargs)
@@ -320,9 +328,7 @@ class _Audit(object):
"""debug+error - the error level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- if error_code and isinstance(error_code, AuditResponseCode):
- all_kwargs[ERROR_CODE] = error_code.value
- all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+ self._set_error_code_in_kwargs(error_code, all_kwargs)
_Audit._logger_debug.error(log_line, **all_kwargs)
_Audit._logger_error.error(log_line, **all_kwargs)
@@ -332,25 +338,32 @@ class _Audit(object):
"""debug+error - the fatal level of logging"""
all_kwargs = self.merge_all_kwargs(**kwargs)
- if error_code and isinstance(error_code, AuditResponseCode):
- all_kwargs[ERROR_CODE] = error_code.value
- all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+ self._set_error_code_in_kwargs(error_code, all_kwargs)
_Audit._logger_debug.fatal(log_line, **all_kwargs)
_Audit._logger_error.fatal(log_line, **all_kwargs)
return log_line
+ def _set_error_code_in_kwargs(self, error_code, all_kwargs):
+ """set the error code and description in kwargs for logging"""
+ if not error_code or not isinstance(error_code, AuditResponseCode):
+ error_code = AuditResponseCode.UNKNOWN_ERROR
+ all_kwargs[ERROR_CODE] = error_code.value
+ all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
@staticmethod
def hide_secrets(obj):
"""hides the known secret field values of the dictionary"""
if not isinstance(obj, dict):
return obj
- for key in obj:
+ for key, val in obj.items():
if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
- obj[key] = "*"
- elif isinstance(obj[key], dict):
- obj[key] = _Audit.hide_secrets(obj[key])
+ hval = hashlib.sha256()
+ hval.update(val.encode())
+ obj[key] = "***({})***".format(hval.hexdigest())
+ elif isinstance(val, dict):
+ obj[key] = _Audit.hide_secrets(val)
return obj
@@ -375,7 +388,7 @@ class Audit(_Audit):
"""create audit object per each request in the system
:job_name: is the name of the audit job for health stats
- :request_id: is the X-ECOMP-RequestID for tracing
+ :request_id: is the X-ONAP-RequestID or X-ECOMP-RequestID for tracing
:req_message: is the request message string for logging
:aud_parent: is the parent Audit - used for sub-query metrics to other systems
:kwargs: - put any request related params into kwargs
@@ -388,7 +401,9 @@ class Audit(_Audit):
headers = self.kwargs.get("headers", {})
if headers:
if not self.request_id:
- self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
+ self.request_id = headers.get(REQUEST_X_ONAP_REQUESTID,
+ headers.get(REQUEST_X_ECOMP_REQUESTID))
+
self.kwargs.setdefault(AUDIT_IPADDRESS, headers.get(REQUEST_REMOTE_ADDR))
self.kwargs.setdefault(AUDIT_SERVER, headers.get(REQUEST_HOST))
diff --git a/policyhandler/pdp_api/dmaap_mr.py b/policyhandler/pdp_api/dmaap_mr.py
new file mode 100644
index 0000000..2d4d468
--- /dev/null
+++ b/policyhandler/pdp_api/dmaap_mr.py
@@ -0,0 +1,202 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""policy-client communicates with policy-engine thru REST API"""
+
+import copy
+import json
+from threading import Lock
+
+import requests
+
+from ..config import Config, Settings
+from ..onap.audit import AuditHttpCode, AuditResponseCode, Metrics
+from ..utils import Utils
+
+_LOGGER = Utils.get_logger(__file__)
+
+class DmaapMr(object):
+ """using the http API to policy-engine"""
+ _lazy_inited = False
+ DEFAULT_TIMEOUT_IN_SECS = 60
+
+ _lock = Lock()
+ _settings = Settings(Config.DMAAP_MR)
+
+ _requests_session = None
+ _long_polling = False
+ _target_entity = None
+ _url = None
+ _query = {}
+ _headers = None
+ _custom_kwargs = {}
+ _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
+
+ @staticmethod
+ def _init(audit):
+ """init static config"""
+ DmaapMr._custom_kwargs = {}
+ tls_ca_mode = None
+
+ if not DmaapMr._requests_session:
+ DmaapMr._requests_session = requests.Session()
+ DmaapMr._requests_session.mount(
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=1,
+ pool_block=True))
+ DmaapMr._requests_session.mount(
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=1,
+ pool_block=True))
+
+
+ _, config = DmaapMr._settings.get_by_key(Config.DMAAP_MR)
+ if config:
+ DmaapMr._url = config.get("url")
+ DmaapMr._headers = config.get("headers", {})
+ DmaapMr._query = copy.deepcopy(config.get("query", {}))
+ if DmaapMr._query.get(Config.QUERY_TIMEOUT, 0) < 1000:
+ DmaapMr._query[Config.QUERY_TIMEOUT] = 15000
+
+ DmaapMr._target_entity = config.get("target_entity", Config.DMAAP_MR)
+
+ tls_ca_mode = config.get(Config.TLS_CA_MODE)
+ DmaapMr._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
+ DmaapMr._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS)
+ if not DmaapMr._timeout_in_secs or DmaapMr._timeout_in_secs < 1:
+ DmaapMr._timeout_in_secs = DmaapMr.DEFAULT_TIMEOUT_IN_SECS
+
+ _LOGGER.info(
+ audit.info(("config DMaaP MR({}) url({}) query({}) headers({}) "
+ "tls_ca_mode({}) custom_kwargs({}) timeout_in_secs({}): {}").format(
+ DmaapMr._target_entity, DmaapMr._url,
+ Metrics.json_dumps(DmaapMr._query),
+ Metrics.json_dumps(DmaapMr._headers), tls_ca_mode,
+ json.dumps(DmaapMr._custom_kwargs), DmaapMr._timeout_in_secs,
+ DmaapMr._settings)))
+
+ DmaapMr._settings.commit_change()
+ DmaapMr._lazy_inited = True
+
+ @staticmethod
+ def reconfigure(audit):
+ """reconfigure"""
+ with DmaapMr._lock:
+ DmaapMr._settings.set_config(Config.discovered_config)
+ if not DmaapMr._settings.is_changed():
+ DmaapMr._settings.commit_change()
+ return False
+
+ DmaapMr._lazy_inited = False
+ DmaapMr._long_polling = False
+ DmaapMr._init(audit)
+ return True
+
+ @staticmethod
+ def _lazy_init(audit):
+ """init static config"""
+ if DmaapMr._lazy_inited:
+ return
+
+ with DmaapMr._lock:
+ if DmaapMr._lazy_inited:
+ return
+
+ DmaapMr._settings.set_config(Config.discovered_config)
+ DmaapMr._long_polling = False
+ DmaapMr._init(audit)
+
+ @staticmethod
+ def get_policy_updates(audit):
+ """
+ get from DMaaP MR - returns json list of stringified messages
+
+ example [
+ "{\"deployed-policies\":[
+ {\"policy-type\":\"onap.policies.monitoring.cdap.tca.hi.lo.app\",
+ \"policy-type-version\":\"1.0.0\",
+ \"policy-id\":\"onap.scaleout.tca\",
+ \"policy-version\":\"2.2.2\",
+ \"success-count\":3,
+ \"failure-count\":0
+ }],
+ \"undeployed-policies\":[
+ {\"policy-type\":\"onap.policies.monitoring.cdap.tca.hi.lo.app\",
+ \"policy-type-version\":\"1.0.0\",
+ \"policy-id\":\"onap.scaleout.tca\",
+ \"policy-version\":\"1.0.0\",
+ \"success-count\":3,
+ \"failure-count\":0
+ }]}"
+ ]
+ """
+ DmaapMr._lazy_init(audit)
+
+ if not DmaapMr._url:
+ _LOGGER.error(
+ audit.error("no url for DMaaP MR", error_code=AuditResponseCode.AVAILABILITY_ERROR))
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
+
+ with DmaapMr._lock:
+ target_entity = DmaapMr._target_entity
+ url = DmaapMr._url
+ params = copy.deepcopy(DmaapMr._query) if DmaapMr._long_polling else None
+ headers = copy.deepcopy(DmaapMr._headers)
+ timeout_in_secs = DmaapMr._timeout_in_secs
+ custom_kwargs = copy.deepcopy(DmaapMr._custom_kwargs)
+ DmaapMr._long_polling = True
+
+ metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url)
+
+ headers = metrics.put_request_id_into_headers(headers)
+
+ log_line = (
+ "get from {} at {} with params={}, headers={}, custom_kwargs({}) timeout_in_secs({})"
+ .format(target_entity, url, json.dumps(params), Metrics.json_dumps(headers),
+ json.dumps(custom_kwargs), timeout_in_secs))
+
+ _LOGGER.info(metrics.metrics_start(log_line))
+
+ res = None
+ try:
+ res = DmaapMr._requests_session.get(url, params=params, headers=headers,
+ timeout=timeout_in_secs, **custom_kwargs)
+
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line))
+
+ _LOGGER.exception(metrics.fatal(error_msg))
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return None
+
+ log_line = "response {} from {}: text={} headers={}".format(
+ res.status_code, log_line, res.text, Metrics.json_dumps(dict(res.headers.items())))
+ _LOGGER.info(log_line)
+
+ metrics.set_http_status_code(res.status_code)
+ audit.set_http_status_code(res.status_code)
+ metrics.metrics(log_line)
+
+ policy_updates = None
+ if res.status_code == requests.codes.ok:
+ policy_updates = res.json()
+
+ return policy_updates
diff --git a/policyhandler/pdp_api/pdp_consts.py b/policyhandler/pdp_api/pdp_consts.py
index 2337456..32f80f3 100644
--- a/policyhandler/pdp_api/pdp_consts.py
+++ b/policyhandler/pdp_api/pdp_consts.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -33,3 +33,7 @@ PDP_REQ_ONAP_NAME = "ONAPName" # always "DCAE"
PDP_REQ_ONAP_COMPONENT = "ONAPComponent"
PDP_REQ_ONAP_INSTANCE = "ONAPInstance"
PDP_REQ_RESOURCE = "resource"
+
+# fields from policy-update notification
+DEPLOYED_POLICIES = "deployed-policies"
+UNDEPLOYED_POLICIES = "undeployed-policies"
diff --git a/policyhandler/pdp_api/policy_listener.py b/policyhandler/pdp_api/policy_listener.py
index 9fa4695..0d33785 100644
--- a/policyhandler/pdp_api/policy_listener.py
+++ b/policyhandler/pdp_api/policy_listener.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,39 +17,173 @@
"""
policy-listener communicates with policy-engine
-to receive push notifications
+to receive push notifications through DMaaP MR
on updates and removal of policies.
-on receiving the policy-notifications, the policy-receiver
+on receiving the policy-notifications, the policy-listener
passes the notifications to policy-updater
"""
+import json
import os
+from threading import Event, Lock, Thread
-from ..utils import ToBeImplementedException, Utils
+from ..onap.audit import Audit, AuditResponseCode
+from ..utils import Utils
+from .dmaap_mr import DmaapMr
+from .pdp_consts import (DEPLOYED_POLICIES, PDP_METADATA, PDP_POLICY_ID,
+ PDP_POLICY_VERSION, UNDEPLOYED_POLICIES)
_LOGGER = Utils.get_logger(__file__)
-class PolicyListener(object):
- """listener to PolicyEngine"""
+class PolicyListener(Thread):
+ """listener to DMaaP MR"""
PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+ SLEEP_BEFORE_RESTARTING = 30
- def __init__(self, *_):
+ def __init__(self, audit, policy_updater):
"""listener to receive the policy notifications from PolicyEngine"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ Thread.__init__(self, name="policy_listener", daemon=True)
- def reconfigure(self, _):
- """configure and reconfigure the listener"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ self._policy_updater = policy_updater
+ self._lock = Lock()
+ self._run_event = Event()
+ self._keep_running = True
+ self._first_loop = True
+
+ self._dmaap_mr = None
+ self.reconfigure(audit)
+
+ def reconfigure(self, audit):
+ """configure and reconfigure the DMaaP MR"""
+ reconfigured = DmaapMr.reconfigure(audit)
+ if reconfigured and not self._first_loop:
+ with self._lock:
+ self._first_loop = True
+ return reconfigured
def run(self):
- """listen on web-socket and pass the policy notifications to policy-updater"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
-
- def shutdown(self, _):
- """Shutdown the policy-listener"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ """listen on DMaaP MR and pass the policy notifications to policy-updater"""
+ _LOGGER.info("starting policy_listener...")
+ delayed_restarting = False
+ while True:
+ if not self._get_keep_running():
+ break
+
+ if delayed_restarting:
+ _LOGGER.info(
+ "going to sleep for %s secs before restarting policy-notifications",
+ PolicyListener.SLEEP_BEFORE_RESTARTING)
+
+ self._run_event.clear()
+ self._run_event.wait(PolicyListener.SLEEP_BEFORE_RESTARTING)
+ if not self._get_keep_running():
+ break
+
+ audit = Audit(job_name="policy_update",
+ req_message="waiting for policy-notifications...",
+ retry_get_config=True)
+
+ policy_updates = DmaapMr.get_policy_updates(audit)
+
+ if not self._get_keep_running():
+ audit.audit_done(result="exiting policy_listener")
+ break
+
+ delayed_restarting = not audit.is_success()
+ if self._first_loop:
+ policy_updater = None
+ with self._lock:
+ if self._first_loop:
+ self._first_loop = False
+ policy_updater = self._policy_updater
+ if policy_updater is not None:
+ audit.req_message = "first catch_up"
+ _LOGGER.info(audit.info("first catch_up - ignoring policy-updates: {}"
+ .format(json.dumps(policy_updates))))
+ policy_updater.catch_up(audit)
+ elif not policy_updates:
+ _LOGGER.info(audit.info(
+ "no policy-updates: {}".format(json.dumps(policy_updates))))
+ audit.audit_done(result="no policy-updates")
+ else:
+ self._on_policy_update_message(audit, policy_updates)
+
+ _LOGGER.info("exit policy_listener")
+
+ def _get_keep_running(self):
+ """thread-safe check whether to continue running"""
+ with self._lock:
+ keep_running = self._keep_running
+ return keep_running
+
+ def _on_policy_update_message(self, audit, policy_updates):
+ """received the notification from PDP"""
+ try:
+ _LOGGER.info("Received notification message: %s", json.dumps(policy_updates))
+ if not policy_updates:
+ return
+
+ policies_updated = []
+
+ for idx, pdp_update_msg in enumerate(policy_updates):
+ pdp_update_msg = Utils.safe_json_parse(pdp_update_msg)
+
+ if not pdp_update_msg or not isinstance(pdp_update_msg, dict):
+ _LOGGER.warning(audit.warn(
+ "unexpected message from PDP: {}".format(json.dumps(pdp_update_msg)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ continue
+
+ _LOGGER.debug("raw policy_update[%s]: %s", idx, json.dumps(pdp_update_msg))
+
+ deployed_policies = [
+ {PDP_METADATA: {PDP_POLICY_ID: p_deployed.get(PDP_POLICY_ID),
+ PDP_POLICY_VERSION: p_deployed.get(PDP_POLICY_VERSION)}}
+ for p_deployed in pdp_update_msg.get(DEPLOYED_POLICIES, [])
+ if (p_deployed.get(PDP_POLICY_ID) is not None
+ and p_deployed.get(PDP_POLICY_VERSION) is not None)]
+
+ undeployed_policies = [
+ {PDP_METADATA: {PDP_POLICY_ID: p_undeployed.get(PDP_POLICY_ID),
+ PDP_POLICY_VERSION: p_undeployed.get(PDP_POLICY_VERSION)}}
+ for p_undeployed in pdp_update_msg.get(UNDEPLOYED_POLICIES, [])
+ if (p_undeployed.get(PDP_POLICY_ID) is not None
+ and p_undeployed.get(PDP_POLICY_VERSION) is not None)]
+
+ if not deployed_policies and not undeployed_policies:
+ _LOGGER.warning(audit.warn(
+ "no policy deployed or undeployed: {}".format(json.dumps(pdp_update_msg)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ continue
+
+ policy_update = {DEPLOYED_POLICIES: deployed_policies,
+ UNDEPLOYED_POLICIES: undeployed_policies}
+ _LOGGER.info(audit.info("policy_update[{}]: {}"
+ .format(idx, json.dumps(policy_update))))
+
+ policies_updated.append(policy_update)
+
+ if not policies_updated:
+ _LOGGER.warning(audit.warn(
+ "erroneous notification from PDP: {}".format(json.dumps(policy_updates)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ return
+
+ with self._lock:
+ policy_updater = self._policy_updater
+ if policy_updater is not None:
+ policy_updater.policy_update(audit, policies_updated)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_policy_update_message",
+ json.dumps(policy_updates))
+ _LOGGER.exception(audit.fatal(error_msg))
+
+ def shutdown(self, audit):
+ """Shutdown the policy_listener"""
+ _LOGGER.info(audit.info("shutdown policy_listener - no waiting..."))
+ with self._lock:
+ self._keep_running = False
+ self._policy_updater = None
+ self._run_event.set()
diff --git a/policyhandler/pdp_api/policy_matcher.py b/policyhandler/pdp_api/policy_matcher.py
index 57258c3..2972fb8 100644
--- a/policyhandler/pdp_api/policy_matcher.py
+++ b/policyhandler/pdp_api/policy_matcher.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,7 +19,98 @@
import os
+from ..deploy_handler import DeployHandler, PolicyUpdateMessage
+from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
+ POLICY_VERSIONS)
+from ..utils import Utils
+from .pdp_consts import POLICY_VERSION
+from .policy_rest import PolicyRest
+
+_LOGGER = Utils.get_logger(__file__)
class PolicyMatcher(object):
"""policy-matcher - static class"""
+ PENDING_UPDATE = "pending_update"
PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+
+ @staticmethod
+ def build_catch_up_message(audit, deployed_policies, _=None):
+ """find the latest policies from policy-engine for the deployed policies"""
+
+ if not deployed_policies:
+ error_txt = "no deployed policies"
+ _LOGGER.warning(error_txt)
+ return {"error": error_txt}, None
+
+ pdp_response = PolicyRest.get_latest_policies(audit, policy_ids=list(deployed_policies))
+
+ if not audit.is_success():
+ error_txt = "failed to retrieve policies from policy-engine"
+ _LOGGER.warning(error_txt)
+ return {"error": error_txt}, None
+
+ latest_policies = pdp_response.get(LATEST_POLICIES, {})
+ errored_policies = pdp_response.get(ERRORED_POLICIES, {})
+
+ latest_policies, changed_policies = PolicyMatcher._match_policies(
+ latest_policies, deployed_policies)
+
+ errored_policies = dict((policy_id, policy)
+ for (policy_id, policy) in errored_policies.items()
+ if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS))
+
+ removed_policies = dict(
+ (policy_id, True)
+ for (policy_id, deployed_policy) in deployed_policies.items()
+ if deployed_policy.get(POLICY_VERSIONS)
+ and policy_id not in latest_policies
+ and policy_id not in errored_policies
+ )
+
+ return ({LATEST_POLICIES: latest_policies, ERRORED_POLICIES: errored_policies},
+ PolicyUpdateMessage(changed_policies, removed_policies))
+
+ @staticmethod
+ def match_to_deployed_policies(audit, policies_updated, policies_removed):
+ """match the policies_updated, policies_removed versus deployed policies"""
+ _, deployed_policies, _ = DeployHandler.get_deployed_policies(audit)
+ if not audit.is_success():
+ return {}, {}, {}
+
+ _, changed_policies = PolicyMatcher._match_policies(policies_updated, deployed_policies)
+
+ policies_removed = dict((policy_id, policy)
+ for (policy_id, policy) in policies_removed.items()
+ if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS))
+
+ return changed_policies, policies_removed, {}
+
+
+ @staticmethod
+ def _match_policies(policies, deployed_policies):
+ """
+ Match policies to deployed policies by policy_id.
+
+ Also calculates the policies that changed in comparison to deployed policies
+ """
+ matching_policies = {}
+ changed_policies = {}
+
+ policies = policies or {}
+ deployed_policies = deployed_policies or {}
+
+ for (policy_id, policy) in policies.items():
+ new_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
+ deployed_policy = deployed_policies.get(policy_id)
+
+ if deployed_policy:
+ matching_policies[policy_id] = policy
+
+ policy_changed = (deployed_policy and new_version
+ and (deployed_policy.get(PolicyMatcher.PENDING_UPDATE)
+ or {new_version} ^
+ deployed_policy.get(POLICY_VERSIONS, {}).keys()))
+ if policy_changed:
+ changed_policies[policy_id] = policy
+
+ return matching_policies, changed_policies
diff --git a/policyhandler/pdp_api/policy_rest.py b/policyhandler/pdp_api/policy_rest.py
index 14d9296..0b33ccd 100644
--- a/policyhandler/pdp_api/policy_rest.py
+++ b/policyhandler/pdp_api/policy_rest.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,16 +20,19 @@
import copy
import json
import os
+import time
import urllib.parse
+from multiprocessing.dummy import Pool as ThreadPool
from threading import Lock
import requests
from ..config import Config, Settings
-from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
- AuditResponseCode, Metrics)
+from ..onap.audit import AuditHttpCode, AuditResponseCode, Metrics
+from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
+ POLICY_ID, POLICY_NAMES)
from ..utils import Utils
-from .pdp_consts import PDP_POLICIES
+from .pdp_consts import PDP_POLICIES, POLICY_NAME, POLICY_VERSION
from .policy_utils import PolicyUtils
_LOGGER = Utils.get_logger(__file__)
@@ -37,11 +40,15 @@ _LOGGER = Utils.get_logger(__file__)
class PolicyRest(object):
"""using the http API to policy-engine"""
PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
- _lazy_inited = False
+ EXPECTED_VERSIONS = "expected_versions"
+ IGNORE_POLICY_NAMES = "ignore_policy_names"
DEFAULT_TIMEOUT_IN_SECS = 60
+ _lazy_inited = False
_lock = Lock()
- _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS)
+ _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS,
+ Config.THREAD_POOL_SIZE,
+ Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP)
_target_entity = None
_requests_session = None
@@ -49,6 +56,9 @@ class PolicyRest(object):
_url_pdp_decision = None
_headers = None
_custom_kwargs = {}
+ _thread_pool_size = 4
+ _policy_retry_count = 1
+ _policy_retry_sleep = 0
_timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
@staticmethod
@@ -63,10 +73,10 @@ class PolicyRest(object):
changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20)
if changed:
PolicyRest._requests_session.mount(
- 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=1,
pool_maxsize=pool_size))
PolicyRest._requests_session.mount(
- 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=1,
pool_maxsize=pool_size))
_, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
@@ -77,6 +87,15 @@ class PolicyRest(object):
PolicyRest._url, config.get("path_decision", "/decision/v1/"))
PolicyRest._headers = config.get("headers", {})
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
+ _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key(
+ Config.THREAD_POOL_SIZE, 4)
+ if PolicyRest._thread_pool_size < 2:
+ PolicyRest._thread_pool_size = 2
+
+ _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_COUNT, 1)
+ _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_SLEEP, 0)
tls_ca_mode = config.get(Config.TLS_CA_MODE)
PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
@@ -121,13 +140,13 @@ class PolicyRest(object):
PolicyRest._init()
@staticmethod
- def _pdp_get_decision(audit, pdp_req):
- """Communication with the policy-engine"""
+ def _pdp_get_decision(audit, policy_ids):
+ """get policies from the policy-engine by policy-ids"""
if not PolicyRest._url:
_LOGGER.error(
audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR))
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- return None
+ return None, None
with PolicyRest._lock:
session = PolicyRest._requests_session
@@ -137,9 +156,11 @@ class PolicyRest(object):
headers = copy.deepcopy(PolicyRest._headers)
custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs)
+ pdp_req = PolicyUtils.gen_req_to_pdp(policy_ids)
+
metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url)
- headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
+ headers = metrics.put_request_id_into_headers(headers)
log_action = "post to {} at {}".format(target_entity, url)
log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format(
@@ -159,11 +180,11 @@ class PolicyRest(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line))
- _LOGGER.exception(error_msg)
+ _LOGGER.exception(metrics.fatal(error_msg))
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
- return None
+ return (error_code, None)
log_line = "response {} from {}: text={} headers={}".format(
res.status_code, log_line, res.text,
@@ -174,40 +195,282 @@ class PolicyRest(object):
audit.set_http_status_code(res.status_code)
metrics.metrics(log_line)
- policy_bodies = None
+ latest_policies = None
if res.status_code == requests.codes.ok:
- policy_bodies = res.json().get(PDP_POLICIES)
+ policy_bodies = res.json().get(PDP_POLICIES, {})
+ latest_policies = dict((policy_id, PolicyUtils.convert_to_policy(policy))
+ for (policy_id, policy) in policy_bodies.items())
+
+ return res.status_code, latest_policies
- return policy_bodies
@staticmethod
def get_latest_policy(aud_policy_id):
"""safely try retrieving the latest policy for the policy_id from the policy-engine"""
- audit, policy_id, _, _ = aud_policy_id
+ audit, policy_id, expected_versions, ignore_policy_names = aud_policy_id
+ str_metrics = "policy_id({0}), expected_versions({1}) ignore_policy_names({2})".format(
+ policy_id, json.dumps(expected_versions), json.dumps(ignore_policy_names))
+
try:
- PolicyRest._lazy_init()
+ return PolicyRest._get_latest_policy(
+ audit, policy_id, expected_versions, ignore_policy_names, str_metrics)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_policy", str_metrics))
+
+ _LOGGER.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
- pdp_req = PolicyUtils.gen_req_to_pdp(policy_id)
- policy_bodies = PolicyRest._pdp_get_decision(audit, pdp_req)
- log_line = "looking for policy_id({}) in policy_bodies: {}".format(
- policy_id, json.dumps(policy_bodies))
- _LOGGER.info(log_line)
+ @staticmethod
+ def _get_latest_policy(audit, policy_id,
+ expected_versions, ignore_policy_names, str_metrics):
+ """retry several times getting the latest policy for the policy_id from the policy-engine"""
+ PolicyRest._lazy_init()
+ latest_policy = None
+ status_code = 0
+ retry_get_config = audit.kwargs.get("retry_get_config")
- latest_policy = None
- if policy_bodies and policy_id in policy_bodies:
- latest_policy = PolicyUtils.convert_to_policy(policy_bodies[policy_id])
+ for retry in range(1, PolicyRest._policy_retry_count + 1):
+ _LOGGER.debug("try(%s) retry_get_config(%s): %s", retry, retry_get_config, str_metrics)
+
+ done, removed, latest_policy, status_code = PolicyRest._get_latest_policy_once(
+ audit, policy_id, expected_versions, ignore_policy_names)
- if not PolicyUtils.validate_policy(latest_policy):
+ if removed:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
- _LOGGER.error(audit.error(
- "received invalid policy from PDP: {}".format(json.dumps(latest_policy)),
- error_code=AuditResponseCode.DATA_ERROR))
+ return None
+
+ if done or not retry_get_config or not PolicyRest._policy_retry_sleep:
+ break
+
+ if retry == PolicyRest._policy_retry_count:
+ _LOGGER.error(
+ audit.error("gave up retrying after #{} for policy_id({}) from PDP {}"
+ .format(retry, policy_id, PolicyRest._url_pdp_decision),
+ error_code=AuditResponseCode.DATA_ERROR))
+ break
+
+ _LOGGER.warning(audit.warn(
+ "will retry({}) for policy_id({}) in {} secs from PDP {}".format(
+ retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_pdp_decision),
+ error_code=AuditResponseCode.DATA_ERROR))
+ time.sleep(PolicyRest._policy_retry_sleep)
+
+ audit.set_http_status_code(status_code)
+ if not PolicyUtils.validate_policy(latest_policy):
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ _LOGGER.error(audit.error(
+ "received invalid policy from PDP: {}".format(json.dumps(latest_policy)),
+ error_code=AuditResponseCode.DATA_ERROR))
+
+ return latest_policy
+
+ @staticmethod
+ def _get_latest_policy_once(audit, policy_id, expected_versions, ignore_policy_names):
+ """single attempt to get the latest policy for the policy_id from the policy-engine"""
+
+ status_code, latest_policies = PolicyRest._pdp_get_decision(audit, policy_id)
+
+ if (ignore_policy_names and not expected_versions and not latest_policies
+ and AuditHttpCode.HTTP_OK.value == status_code):
+ return True, True, None, status_code
+
+ log_line = "{} looking for policy_id({}) in latest_policies: {}".format(
+ status_code, policy_id, json.dumps(latest_policies))
+ _LOGGER.info(log_line)
+
+ latest_policy = (latest_policies or {}).get(policy_id)
+
+ log_error = ""
+ if latest_policy:
+ policy_body = latest_policy.get(POLICY_BODY, {})
+ policy_version = policy_body.get(POLICY_VERSION)
+ policy_name = policy_body.get(POLICY_NAME)
+ if expected_versions and policy_version not in expected_versions:
+ log_error = ("received unexpected policy version({}) instead of ({})"
+ " from PDP for policy_id={}: {}"
+ .format(policy_version, json.dumps(expected_versions),
+ policy_id, json.dumps(latest_policy)))
+ elif ignore_policy_names and policy_name in ignore_policy_names:
+ log_error = ("unexpectedly received policy version({}) from PDP"
+ " for policy_id={}: {}. to ignore-policy-names {}"
+ .format(policy_version, policy_id,
+ json.dumps(latest_policy),
+ json.dumps(ignore_policy_names)))
+
+ if not latest_policy or log_error:
+ _LOGGER.error(audit.error(
+ log_error or "received unexpected policy data({}) from PDP for policy_id={}: {}"
+ .format(json.dumps(latest_policy), policy_id, json.dumps(latest_policies)),
+ error_code=AuditResponseCode.DATA_ERROR))
+ latest_policy = None
+
+ done = bool(latest_policy or audit.is_serious_error(status_code))
+ return done, False, latest_policy, status_code
+
+ @staticmethod
+ def get_latest_updated_policies(audit, updated_policies, removed_policies):
+ """safely try retrieving the latest policies for the list of policy_names"""
+ if not updated_policies and not removed_policies:
+ return None, None
+
+ policies_updated = [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION))
+ for policy_id, policy in updated_policies.items()]
+ policies_removed = [(policy_id, policy.get(POLICY_NAMES, {}))
+ for policy_id, policy in removed_policies.items()]
+
+ if not policies_updated and not policies_removed:
+ return None, None
+
+ str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
+ len(policies_updated), json.dumps(policies_updated),
+ len(policies_removed), json.dumps(policies_removed))
+
+ try:
+ return PolicyRest._get_latest_updated_policies(
+ audit, str_metrics, policies_updated, policies_removed)
+
+ except Exception as ex:
+ error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_updated_policies", str_metrics))
+
+ _LOGGER.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None, None
+
+ @staticmethod
+ def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed):
+ """Get the latest policies of the list of policy_names from the policy-engine"""
+ PolicyRest._lazy_init()
+ metrics_total = Metrics(
+ aud_parent=audit,
+ targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity),
+ targetServiceName=PolicyRest._url_pdp_decision)
+
+ metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+ _LOGGER.debug(str_metrics)
+
+ policies_to_find = {}
+ for (policy_id, policy_version) in policies_updated:
+ if not policy_id or policy_version is None:
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.EXPECTED_VERSIONS: {policy_version: True},
+ PolicyRest.IGNORE_POLICY_NAMES: {}
+ }
+ continue
+ policy[PolicyRest.EXPECTED_VERSIONS][policy_version] = True
+
+ for (policy_id, policy_names) in policies_removed:
+ if not policy_id:
+ continue
+ policy = policies_to_find.get(policy_id)
+ if not policy:
+ policies_to_find[policy_id] = {
+ POLICY_ID: policy_id,
+ PolicyRest.IGNORE_POLICY_NAMES: policy_names
+ }
+ continue
+ policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names)
+
+ apns = [(audit, policy_id,
+ policy_to_find.get(PolicyRest.EXPECTED_VERSIONS),
+ policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
+ for (policy_id, policy_to_find) in policies_to_find.items()]
+
+ policies = None
+ apns_length = len(apns)
+ _LOGGER.debug("apns_length(%s) policies_to_find %s", apns_length,
+ json.dumps(policies_to_find))
+
+ if apns_length == 1:
+ policies = [PolicyRest.get_latest_policy(apns[0])]
+ else:
+ pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
+ policies = pool.map(PolicyRest.get_latest_policy, apns)
+ pool.close()
+ pool.join()
+
+ metrics_total.metrics("result({}) get_latest_updated_policies {}: {} {}"
+ .format(apns_length, str_metrics,
+ len(policies), json.dumps(policies)))
+
+ updated_policies = dict((policy[POLICY_ID], policy)
+ for policy in policies
+ if policy and policy.get(POLICY_ID))
+
+ removed_policies = dict((policy_id, True)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if not policy_to_find.get(PolicyRest.EXPECTED_VERSIONS)
+ and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
+ and policy_id not in updated_policies)
+
+ errored_policies = dict((policy_id, policy_to_find)
+ for (policy_id, policy_to_find) in policies_to_find.items()
+ if policy_id not in updated_policies
+ and policy_id not in removed_policies)
+
+ _LOGGER.debug(
+ "result(%s) updated_policies %s, removed_policies %s, errored_policies %s",
+ apns_length, json.dumps(updated_policies), json.dumps(removed_policies),
+ json.dumps(errored_policies))
+
+ if errored_policies:
+ audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value)
+ audit.error(
+ "errored_policies in PDP: {}".format(json.dumps(errored_policies)),
+ error_code=AuditResponseCode.DATA_ERROR)
+
+ return updated_policies, removed_policies
+
+
+ @staticmethod
+ def get_latest_policies(audit, policy_ids=None):
+ """Get the latest policies by policy-ids from the policy-engine"""
+ result = {}
+ str_policy_ids = json.dumps(policy_ids or [])
+
+ try:
+ PolicyRest._lazy_init()
+ if policy_ids:
+ _, latest_policies = PolicyRest._pdp_get_decision(audit, policy_ids)
+
+ if not latest_policies:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ _LOGGER.warning(audit.warn(
+ "received no policies from PDP for policy_ids {}"
+ .format(str_policy_ids), error_code=AuditResponseCode.DATA_ERROR))
+ return latest_policies
+
+ valid_policies = {}
+ errored_policies = {}
+ for (policy_id, policy) in latest_policies.items():
+ if PolicyUtils.validate_policy(policy):
+ valid_policies[policy_id] = policy
+ else:
+ errored_policies[policy_id] = policy
+
+ result[LATEST_POLICIES] = valid_policies
+ result[ERRORED_POLICIES] = errored_policies
+
+ _LOGGER.debug("got policies for policy_ids: %s. result: %s",
+ str_policy_ids, json.dumps(result))
+ return result
- return latest_policy
except Exception as ex:
- error_msg = ("{}: get_latest_policy({}) crash {}: {}"
- .format(audit.request_id, policy_id, type(ex).__name__, str(ex)))
+ error_msg = ("{}: crash {} {} at {}: {}"
+ .format(audit.request_id, type(ex).__name__, str(ex),
+ "get_latest_policies", str_policy_ids))
_LOGGER.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
diff --git a/policyhandler/pdp_api/policy_updates.py b/policyhandler/pdp_api/policy_updates.py
index eb3c3d1..15f5b0a 100644
--- a/policyhandler/pdp_api/policy_updates.py
+++ b/policyhandler/pdp_api/policy_updates.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,10 +17,13 @@
"""policy-updates accumulates the policy-update notifications from PDP"""
+import json
import os
-from ..utils import Utils, ToBeImplementedException
-
+from ..policy_consts import POLICY_BODY, POLICY_ID, POLICY_NAMES
+from ..utils import Utils
+from .pdp_consts import DEPLOYED_POLICIES, POLICY_NAME, UNDEPLOYED_POLICIES
+from .policy_utils import PolicyUtils
_LOGGER = Utils.get_logger(__file__)
@@ -30,9 +33,12 @@ class PolicyUpdates(object):
def __init__(self):
"""init and reset"""
+ self._audit = None
+ self._policies_updated = {}
+ self._policies_removed = {}
def reset(self):
- """resets the state"""
+ """resets the state - removes the pending policy-updates"""
self.__init__()
def pop_policy_updates(self):
@@ -40,10 +46,71 @@ class PolicyUpdates(object):
Returns the consolidated (audit, policies_updated, policies_removed)
and resets the state
"""
- _LOGGER.info("to_be_implemented")
- return None, None, None
+ if not self._audit:
+ return None, None, None
+
+ audit = self._audit
+ policies_updated = self._policies_updated
+ policies_removed = self._policies_removed
+
+ self.reset()
+
+ return audit, policies_updated, policies_removed
+
+
+ def push_policy_updates(self, audit, multi_policies_updated):
+ """
+ consolidate the new policies_updated, policies_removed to existing ones
+
+ receives
+ :multi_policies_updated: as [
+ {DEPLOYED_POLICIES: [{PDP_METADATA: {POLICY_ID: <policy_id>,
+ POLICY_VERSION: <policy_version>}}, ...],
+ UNDEPLOYED_POLICIES: [{PDP_METADATA: {POLICY_ID: <policy_id>,
+ POLICY_VERSION: <policy_version>}}, ...]
+ }, ...]
+ """
+ for p_single_updated in multi_policies_updated:
+ for p_undeployed in p_single_updated.get(UNDEPLOYED_POLICIES, []):
+ policy = PolicyUtils.convert_to_policy(p_undeployed)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+ policy_name = policy.get(POLICY_BODY, {}).get(POLICY_NAME)
+
+ if policy_id in self._policies_removed:
+ policy = self._policies_removed[policy_id]
+
+ if POLICY_NAMES not in policy:
+ policy[POLICY_NAMES] = {}
+ policy[POLICY_NAMES][policy_name] = True
+ self._policies_removed[policy_id] = policy
+
+ for p_deployed in p_single_updated.get(DEPLOYED_POLICIES, []):
+ policy = PolicyUtils.convert_to_policy(p_deployed)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+ policy_name = policy.get(POLICY_BODY, {}).get(POLICY_NAME)
+
+ self._policies_updated[policy_id] = policy
+
+ rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES)
+ if rm_policy_names and policy_name in rm_policy_names:
+ del rm_policy_names[policy_name]
+
+ req_message = ("policy-update notification - updated[{}], removed[{}]"
+ .format(len(self._policies_updated),
+ len(self._policies_removed)))
+
+ if not self._audit:
+ self._audit = audit
+ else:
+ audit.audit_done(result="policy-updates queued to request_id({})"
+ .format(self._audit.request_id))
+ self._audit.req_message = req_message
- def push_policy_updates(self, *_):
- """consolidate the new policies_updated, policies_removed to existing ones"""
- _LOGGER.info("to_be_implemented")
- raise ToBeImplementedException()
+ _LOGGER.info(
+ "pending(%s) for %s policies_updated %s policies_removed %s",
+ self._audit.request_id, req_message,
+ json.dumps(self._policies_updated), json.dumps(self._policies_removed))
diff --git a/policyhandler/pdp_api/policy_utils.py b/policyhandler/pdp_api/policy_utils.py
index 1d06d14..f2ed522 100644
--- a/policyhandler/pdp_api/policy_utils.py
+++ b/policyhandler/pdp_api/policy_utils.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,14 +30,19 @@ class PolicyUtils(object):
"""policy-client utils"""
@staticmethod
- def gen_req_to_pdp(policy_id):
- """request to get a single policy from pdp by policy_id"""
+ def gen_req_to_pdp(policy_ids):
+ """request to get policies from pdp by policy_id list or a single value"""
+ if not policy_ids:
+ policy_ids = []
+ elif not isinstance(policy_ids, list):
+ policy_ids = [policy_ids]
+
return {
PDP_REQ_ONAP_NAME: "DCAE",
PDP_REQ_ONAP_COMPONENT: Audit.service_name,
PDP_REQ_ONAP_INSTANCE: Audit.SERVICE_INSTANCE_UUID,
"action": "configure",
- PDP_REQ_RESOURCE: {PDP_POLICY_ID: [policy_id]}
+ PDP_REQ_RESOURCE: {PDP_POLICY_ID: policy_ids}
}
@staticmethod
@@ -52,7 +57,7 @@ class PolicyUtils(object):
"version": "1.0.0",
"metadata": {
"policy-id": "onap.scaleout.tca",
- "policy-version": 1,
+ "policy-version": "1.2.3",
"description": "The scaleout policy for vDNS"
},
"properties": {
@@ -73,13 +78,13 @@ class PolicyUtils(object):
{
"policy_id": "onap.scaleout.tca",
"policy_body": {
- "policyName": "onap.scaleout.tca.1.xml",
- "policyVersion": 1,
+ "policyName": "onap.scaleout.tca.1-2-3.xml",
+ "policyVersion": "1.2.3",
"type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
"version": "1.0.0",
"metadata": {
"policy-id": "onap.scaleout.tca",
- "policy-version": 1,
+ "policy-version": "1.2.3",
"description": "The scaleout policy for vDNS"
},
"config": {
@@ -97,19 +102,20 @@ class PolicyUtils(object):
}
}
"""
- if not policy_body or not policy_body.get(PDP_PROPERTIES):
+ if not policy_body:
return None
pdp_metadata = policy_body.get(PDP_METADATA, {})
policy_id = pdp_metadata.get(PDP_POLICY_ID)
policy_version = pdp_metadata.get(PDP_POLICY_VERSION)
- if not policy_id or not policy_version:
+ if not policy_id or policy_version is None:
return None
- policy_body[POLICY_NAME] = "{}.{}.xml".format(policy_id, policy_version)
+ policy_body[POLICY_NAME] = "{}.{}.xml".format(policy_id, policy_version.replace(".", "-"))
policy_body[POLICY_VERSION] = str(policy_version)
- policy_body[POLICY_CONFIG] = policy_body[PDP_PROPERTIES]
- del policy_body[PDP_PROPERTIES]
+ if PDP_PROPERTIES in policy_body:
+ policy_body[POLICY_CONFIG] = policy_body[PDP_PROPERTIES]
+ del policy_body[PDP_PROPERTIES]
return {POLICY_ID:policy_id, POLICY_BODY:policy_body}
diff --git a/policyhandler/pdp_api_v0/policy_listener.py b/policyhandler/pdp_api_v0/policy_listener.py
index 67e4c49..7525e4d 100644
--- a/policyhandler/pdp_api_v0/policy_listener.py
+++ b/policyhandler/pdp_api_v0/policy_listener.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
#
"""
-policy-listener communicates with policy-engine
+policy_listener communicates with policy-engine
thru web-socket to receive push notifications
on updates and removal of policies.
@@ -298,8 +298,8 @@ class PolicyListener(Thread):
def shutdown(self, audit):
- """Shutdown the policy-listener"""
- _LOGGER.info(audit.info("shutdown policy-listener"))
+ """Shutdown the policy_listener"""
+ _LOGGER.info(audit.info("shutdown policy_listener"))
with self._lock:
self._keep_running = False
diff --git a/policyhandler/pdp_api_v0/policy_matcher.py b/policyhandler/pdp_api_v0/policy_matcher.py
index 357af49..deb6619 100644
--- a/policyhandler/pdp_api_v0/policy_matcher.py
+++ b/policyhandler/pdp_api_v0/policy_matcher.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -38,24 +38,6 @@ class PolicyMatcher(object):
PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
@staticmethod
- def get_deployed_policies(audit):
- """get the deployed policies and policy-filters"""
- deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit)
-
- if audit.is_not_found():
- warning_txt = "got no deployed policies or policy-filters"
- _LOGGER.warning(warning_txt)
- return {"warning": warning_txt}, None, None
-
- if not audit.is_success() or (not deployed_policies and not deployed_policy_filters):
- error_txt = "failed to retrieve policies from deployment-handler"
- _LOGGER.error(error_txt)
- return {"error": error_txt}, None, None
-
- return None, deployed_policies, deployed_policy_filters
-
-
- @staticmethod
def build_catch_up_message(audit, deployed_policies, deployed_policy_filters):
"""
find the latest policies from policy-engine for the deployed policies and policy-filters
@@ -135,7 +117,7 @@ class PolicyMatcher(object):
@staticmethod
def match_to_deployed_policies(audit, policies_updated, policies_removed):
"""match the policies_updated, policies_removed versus deployed policies"""
- deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit)
+ _, deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit)
if not audit.is_success():
return {}, {}, {}
diff --git a/policyhandler/pdp_api_v0/policy_rest.py b/policyhandler/pdp_api_v0/policy_rest.py
index c59625e..30fc043 100644
--- a/policyhandler/pdp_api_v0/policy_rest.py
+++ b/policyhandler/pdp_api_v0/policy_rest.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -28,8 +28,7 @@ from threading import Lock
import requests
from ..config import Config, Settings
-from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
- AuditResponseCode, Metrics)
+from ..onap.audit import AuditHttpCode, AuditResponseCode, Metrics
from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
POLICY_FILTER, POLICY_FILTERS, POLICY_ID,
POLICY_NAMES)
@@ -84,10 +83,10 @@ class PolicyRest(object):
changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20)
if changed:
PolicyRest._requests_session.mount(
- 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=1,
pool_maxsize=pool_size))
PolicyRest._requests_session.mount(
- 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=1,
pool_maxsize=pool_size))
_, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
@@ -159,7 +158,7 @@ class PolicyRest(object):
_LOGGER.error(
audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR))
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- return None
+ return None, None
with PolicyRest._lock:
session = PolicyRest._requests_session
@@ -171,7 +170,7 @@ class PolicyRest(object):
metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url)
- headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
+ headers = metrics.put_request_id_into_headers(headers)
log_action = "post to {} at {}".format(target_entity, url)
log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format(
@@ -191,7 +190,7 @@ class PolicyRest(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line))
- _LOGGER.exception(error_msg)
+ _LOGGER.exception(metrics.fatal(error_msg))
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
@@ -412,7 +411,7 @@ class PolicyRest(object):
policies_to_find = {}
for (policy_id, policy_version) in policies_updated:
- if not policy_id or not policy_version or not policy_version.isdigit():
+ if not policy_id or policy_version is None or not policy_version.isdigit():
continue
policy = policies_to_find.get(policy_id)
if not policy:
diff --git a/policyhandler/pdp_api_v0/policy_updates.py b/policyhandler/pdp_api_v0/policy_updates.py
index eafdca2..ac68f4a 100644
--- a/policyhandler/pdp_api_v0/policy_updates.py
+++ b/policyhandler/pdp_api_v0/policy_updates.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -40,7 +40,7 @@ class PolicyUpdates(object):
self._policies_removed = {}
def reset(self):
- """resets the state"""
+ """resets the state - removes the pending policy-updates"""
self.__init__()
def pop_policy_updates(self):
@@ -62,25 +62,12 @@ class PolicyUpdates(object):
def push_policy_updates(self, policies_updated, policies_removed):
"""consolidate the new policies_updated, policies_removed to existing ones"""
- for policy_body in policies_updated:
- policy_name = policy_body.get(POLICY_NAME)
- policy = PolicyUtils.convert_to_policy(policy_body)
- if not policy:
- continue
- policy_id = policy.get(POLICY_ID)
-
- self._policies_updated[policy_id] = policy
-
- rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES)
- if rm_policy_names and policy_name in rm_policy_names:
- del rm_policy_names[policy_name]
-
for policy_body in policies_removed:
- policy_name = policy_body.get(POLICY_NAME)
policy = PolicyUtils.convert_to_policy(policy_body)
if not policy:
continue
policy_id = policy.get(POLICY_ID)
+ policy_name = policy_body.get(POLICY_NAME)
if policy_id in self._policies_removed:
policy = self._policies_removed[policy_id]
@@ -90,16 +77,27 @@ class PolicyUpdates(object):
policy[POLICY_NAMES][policy_name] = True
self._policies_removed[policy_id] = policy
+ for policy_body in policies_updated:
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+ policy_name = policy_body.get(POLICY_NAME)
+
+ self._policies_updated[policy_id] = policy
+
+ rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES)
+ if rm_policy_names and policy_name in rm_policy_names:
+ del rm_policy_names[policy_name]
+
req_message = ("policy-update notification - updated[{0}], removed[{1}]"
.format(len(self._policies_updated),
len(self._policies_removed)))
if not self._audit:
- self._audit = Audit(job_name="policy_update",
- req_message=req_message,
+ self._audit = Audit(job_name="policy_update", req_message=req_message,
retry_get_config=True)
- else:
- self._audit.req_message = req_message
+ self._audit.req_message = req_message
_LOGGER.info(
"pending(%s) for %s policies_updated %s policies_removed %s",
diff --git a/policyhandler/pdp_api_v0/policy_utils.py b/policyhandler/pdp_api_v0/policy_utils.py
index d337665..2cbb22c 100644
--- a/policyhandler/pdp_api_v0/policy_utils.py
+++ b/policyhandler/pdp_api_v0/policy_utils.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -63,7 +63,7 @@ class PolicyUtils(object):
return None
policy_name = policy_body.get(POLICY_NAME)
policy_version = policy_body.get(POLICY_VERSION)
- if not policy_name or not policy_version:
+ if not policy_name or policy_version is None:
return None
policy_id = PolicyUtils.extract_policy_id(policy_name)
if not policy_id:
@@ -81,7 +81,7 @@ class PolicyUtils(object):
for policy_body in policy_bodies:
policy_name = policy_body.get(POLICY_NAME)
policy_version = policy_body.get(POLICY_VERSION)
- if not policy_name or not policy_version or not policy_version.isdigit():
+ if not policy_name or policy_version is None or not policy_version.isdigit():
continue
if expected_versions and policy_version not in expected_versions:
continue
@@ -108,7 +108,7 @@ class PolicyUtils(object):
continue
policy_id = policy.get(POLICY_ID)
policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
- if not policy_id or not policy_version or not policy_version.isdigit():
+ if not policy_id or policy_version is None or not policy_version.isdigit():
continue
if (policy_id not in policies
or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])):
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index d949c4b..091c2c3 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,13 +24,15 @@ on receiving the policy-notifications, the policy-receiver
passes the notifications to policy-updater
"""
+from .config import Config
from .service_activator import ServiceActivator
+
class PolicyReceiver(object):
"""
policy-receiver - static singleton wrapper around two threads
policy_updater - master thread for all scheduled actions
- policy_listener - listens to policy-engine through web-socket
+ policy_listener - listens to policy-engine through DMaaP MR or web-socket
"""
_policy_updater = None
_policy_listener = None
@@ -47,9 +49,9 @@ class PolicyReceiver(object):
def _close_listener(audit):
"""stop the notification-handler"""
if PolicyReceiver._policy_listener:
- policy_receiver = PolicyReceiver._policy_listener
+ policy_listener = PolicyReceiver._policy_listener
PolicyReceiver._policy_listener = None
- policy_receiver.shutdown(audit)
+ policy_listener.shutdown(audit)
@staticmethod
def shutdown(audit):
@@ -98,4 +100,7 @@ class PolicyReceiver(object):
PolicyReceiver._policy_updater.start()
- PolicyReceiver.catch_up(audit)
+ if Config.is_pdp_api_default():
+ audit.audit_done(result="will catch_up after draining the policy-update queue")
+ else:
+ PolicyReceiver.catch_up(audit)
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 3fcde40..42c5ee6 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -40,7 +40,7 @@ class PolicyUpdater(Thread):
self._reconfigure_receiver = on_reconfigure_receiver
self._lock = Lock()
- self._run = Event()
+ self._run_event = Event()
self._settings = Settings(CATCH_UP, Config.RECONFIGURE)
self._catch_up_timer = None
@@ -74,11 +74,11 @@ class PolicyUpdater(Thread):
self._settings.commit_change()
return True
- def policy_update(self, policies_updated, policies_removed):
+ def policy_update(self, *args):
"""enqueue the policy-updates"""
with self._lock:
- self._policy_updates.push_policy_updates(policies_updated, policies_removed)
- self._run.set()
+ self._policy_updates.push_policy_updates(*args)
+ self._run_event.set()
def catch_up(self, audit=None):
"""need to bring the latest policies to DCAE-Controller"""
@@ -89,7 +89,7 @@ class PolicyUpdater(Thread):
"catch_up %s request_id %s",
self._aud_catch_up.req_message, self._aud_catch_up.request_id
)
- self._run.set()
+ self._run_event.set()
def reconfigure(self, audit=None):
"""job to check for and bring in the updated config for policy-handler"""
@@ -100,7 +100,7 @@ class PolicyUpdater(Thread):
"%s request_id %s",
self._aud_reconfigure.req_message, self._aud_reconfigure.request_id
)
- self._run.set()
+ self._run_event.set()
def run(self):
"""wait and run the policy-update in thread"""
@@ -108,10 +108,10 @@ class PolicyUpdater(Thread):
self._run_reconfigure_timer()
while True:
_LOGGER.info("waiting for policy-updates...")
- self._run.wait()
+ self._run_event.wait()
with self._lock:
- self._run.clear()
+ self._run_event.clear()
if not self._keep_running():
break
@@ -126,7 +126,7 @@ class PolicyUpdater(Thread):
self._on_policy_update()
- _LOGGER.info("exit policy-updater")
+ _LOGGER.info("exit policy_updater")
def _keep_running(self):
"""thread-safe check whether to continue running"""
@@ -235,7 +235,7 @@ class PolicyUpdater(Thread):
if self._reconfigure_receiver(aud_reconfigure):
need_to_catch_up = True
- changed_configs.append("web-socket")
+ changed_configs.append("policy-receiver")
reconfigure_result = " -- config changed on {} changes: {}".format(
json.dumps(changed_configs), Config.discovered_config)
@@ -246,7 +246,7 @@ class PolicyUpdater(Thread):
Config.discovered_config.commit_change()
aud_reconfigure.audit_done(result=reconfigure_result)
- _LOGGER.info(log_line + reconfigure_result)
+ _LOGGER.info("%s%s", log_line, reconfigure_result)
if need_to_catch_up:
self._pause_catch_up_timer()
@@ -300,8 +300,7 @@ class PolicyUpdater(Thread):
_LOGGER.info(log_line)
self._pause_catch_up_timer()
- (_, policies,
- policy_filters) = pdp_client.PolicyMatcher.get_deployed_policies(aud_catch_up)
+ (_, policies, policy_filters) = DeployHandler.get_deployed_policies(aud_catch_up)
catch_up_message = None
if aud_catch_up.is_not_found():
@@ -422,11 +421,11 @@ class PolicyUpdater(Thread):
def shutdown(self, audit):
- """Shutdown the policy-updater"""
- _LOGGER.info("shutdown policy-updater")
+ """Shutdown the policy_updater"""
+ _LOGGER.info("shutdown policy_updater")
with self._lock:
self._aud_shutdown = audit
- self._run.set()
+ self._run_event.set()
self._stop_timers()
diff --git a/policyhandler/service_activator.py b/policyhandler/service_activator.py
index c1e5b8c..1fab5c6 100644
--- a/policyhandler/service_activator.py
+++ b/policyhandler/service_activator.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -36,8 +36,7 @@ import requests
from .config import Config, Settings
from .discovery import DiscoveryClient
-from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode,
- Metrics)
+from .onap.audit import Audit, AuditHttpCode, Metrics
from .policy_consts import TARGET_ENTITY
from .utils import Utils
@@ -153,16 +152,11 @@ class ServiceActivator(object):
mode_of_operation - whether the service is
active == True or passive == False
based on the current value of the mode_of_operation
-
- temporary for R4 Dublin - passive for new PDP API
"""
active = (ServiceActivator._mode_of_operation is None
or ServiceActivator._mode_of_operation
== ServiceActivator.MODE_OF_OPERATION_ACTIVE)
- if active and Config.is_pdp_api_default():
- active = False
-
if audit:
_LOGGER.info(audit.info("mode_of_operation = {} active = {}".format(
ServiceActivator._mode_of_operation, active)))
@@ -188,7 +182,7 @@ class ServiceActivator(object):
metrics = Metrics(aud_parent=audit,
targetEntity="{} determine_mode_of_operation".format(target_entity),
targetServiceName=url)
- headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
+ headers = metrics.put_request_id_into_headers()
log_action = "post to {} at {}".format(target_entity, url)
log_data = "headers={}, json_body={}, timeout_in_secs={}, custom_kwargs({})".format(
@@ -209,7 +203,7 @@ class ServiceActivator(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = "failed to {} {}: {} {}".format(
log_action, type(ex).__name__, str(ex), log_data)
- _LOGGER.exception(error_msg)
+ _LOGGER.exception(metrics.fatal(error_msg))
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
diff --git a/policyhandler/utils.py b/policyhandler/utils.py
index d728e48..685d7d8 100644
--- a/policyhandler/utils.py
+++ b/policyhandler/utils.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -23,11 +23,6 @@ import os
from copy import deepcopy
from typing import Pattern
-class ToBeImplementedException(Exception):
- """exception for to be implemented features of policy-handler"""
- pass
-
-
class Utils(object):
"""general purpose utils"""
_logger = logging.getLogger("policy_handler.utils")
@@ -91,6 +86,14 @@ class Utils(object):
return False
for key, val_1 in body_1.items():
+ val_2 = body_2[key]
+ if isinstance(val_1, str) or isinstance(val_2, str):
+ if val_1 != val_2:
+ Utils._logger.debug("key-values %s != %s",
+ json_dumps({key: val_1}), json_dumps({key: val_2}))
+ return False
+ continue
+
if not Utils.are_the_same(val_1, body_2[key], json_dumps):
return False
return True
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index 9c2656e..f52f18e 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,15 +18,15 @@
"""web-server for policy_handler"""
import json
-from datetime import datetime
import os
import time
+from datetime import datetime
import cherrypy
from . import pdp_client
from .config import Config
-from .deploy_handler import PolicyUpdateMessage
+from .deploy_handler import DeployHandler, PolicyUpdateMessage
from .onap.audit import Audit, AuditHttpCode
from .policy_receiver import PolicyReceiver
from .utils import Utils
@@ -120,7 +120,7 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s", req_info)
- result, policies, policy_filters = pdp_client.PolicyMatcher.get_deployed_policies(audit)
+ result, policies, policy_filters = DeployHandler.get_deployed_policies(audit)
if not result:
result, policy_update = pdp_client.PolicyMatcher.build_catch_up_message(
audit, policies, policy_filters)
@@ -184,12 +184,12 @@ class _PolicyWeb(object):
}
}
"""
- if Config.is_pdp_api_default():
- raise cherrypy.HTTPError(404, "temporarily unsupported due to the new pdp API")
-
if cherrypy.request.method == "GET":
return self._get_all_policies_latest()
+ if Config.is_pdp_api_default():
+ raise cherrypy.HTTPError(404, "temporarily unsupported due to the new pdp API")
+
if cherrypy.request.method != "POST":
raise cherrypy.HTTPError(404, "unexpected method {0}".format(cherrypy.request.method))