aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2019-04-01 11:32:06 -0400
committerAlex Shatov <alexs@att.com>2019-04-01 11:32:06 -0400
commit9a4d3c5b8dc9c7697275cab38ee45b014dff9e55 (patch)
treed4d55bcc8bc237ee3199d0e6a13f5e7cd95fadea /policyhandler
parentebc1a062328e53e97e4d24ed111534cfc567a809 (diff)
5.0.0 policy-handler - new PDP API or old PDP API4.0.0-ONAPdublin
- in R4 Dublin the policy-engine introduced a totally new API - policy-handler now has a startup option to either use the new PDP API or the old PDP API that was created-updated before the end of 2018 - see README.md and README_pdp_api_v0.md for instructions on how to setup the policy-handler running either with the new PDP API or the old (pdp_api_v0) PDP API - this is a massive refactoring that changed almost all the source files, but kept the old logic when using the old (pdp_api_v0) PDP API - all the code related to PDP API version is split into two subfolders = pdp_api/ contains the new PDP API source code = pdp_api_v0/ contains the old (2018) PDP API source code = pdp_client.py imports from either pdp_api or pdp_api_v0 = the rest of the code is only affected when it needs to branch the logic - logging to policy_handler.log now shows the path of the source file to allow tracing which PDP API is actually used - when the new PDP API is used, the policy-update flow is disabled = passive mode of operation = no web-socket = no periodic catch_up = no policy-filters = reduced web-API - only a single /policy_latest endpoint is available /policies_latest returns 404 /catch_up request is accepted, but ignored - on new PDP API: http /policy_latest returns the new data from the new PDP API with the following fields added by the policy-handler to keep other policy related parts intact in R4 (see pdp_api/policy_utils.py) = "policyName" = policy_id + "." + "policyVersion" + ".xml" = "policyVersion" = str("metadata"."policy-version") = "config" - is the renamed "properties" from the new PDP API response - unit tests are split into two subfolders as well = main/ for the new PDP API testing = pdp_api_v0/ for the old (2018) PDP API - removed the following line from the license text of changed files ECOMP is a trademark and service mark of AT&T Intellectual Property. - the new PDP API is expected to be extended and redesigned in R5 El Alto - on retiring the old PDP API - the intention is to be able to remove the pdp_api_v0/ subfolder and minimal related cleanup of the code that imports that as well as the cleanup of the config.py, etc. Change-Id: Ief9a2ae4541300308caaf97377f4ed051535dbe4 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-1128
Diffstat (limited to 'policyhandler')
-rw-r--r--policyhandler/__main__.py14
-rw-r--r--policyhandler/config.py50
-rw-r--r--policyhandler/deploy_handler.py37
-rw-r--r--policyhandler/discovery.py22
-rw-r--r--policyhandler/onap/audit.py34
-rw-r--r--policyhandler/pdp_api/__init__.py30
-rw-r--r--policyhandler/pdp_api/pdp_consts.py35
-rw-r--r--policyhandler/pdp_api/policy_listener.py55
-rw-r--r--policyhandler/pdp_api/policy_matcher.py25
-rw-r--r--policyhandler/pdp_api/policy_rest.py215
-rw-r--r--policyhandler/pdp_api/policy_updates.py49
-rw-r--r--policyhandler/pdp_api/policy_utils.py123
-rw-r--r--policyhandler/pdp_api_v0/__init__.py30
-rw-r--r--policyhandler/pdp_api_v0/pdp_consts.py23
-rw-r--r--policyhandler/pdp_api_v0/policy_listener.py309
-rw-r--r--policyhandler/pdp_api_v0/policy_matcher.py (renamed from policyhandler/policy_matcher.py)45
-rw-r--r--policyhandler/pdp_api_v0/policy_rest.py (renamed from policyhandler/policy_rest.py)147
-rw-r--r--policyhandler/pdp_api_v0/policy_updates.py107
-rw-r--r--policyhandler/pdp_api_v0/policy_utils.py120
-rw-r--r--policyhandler/pdp_client.py29
-rw-r--r--policyhandler/policy_consts.py5
-rw-r--r--policyhandler/policy_receiver.py320
-rw-r--r--policyhandler/policy_updater.py214
-rw-r--r--policyhandler/service_activator.py41
-rw-r--r--policyhandler/step_timer.py18
-rw-r--r--policyhandler/utils.py (renamed from policyhandler/policy_utils.py)127
-rw-r--r--policyhandler/web_server.py20
27 files changed, 1492 insertions, 752 deletions
diff --git a/policyhandler/__main__.py b/policyhandler/__main__.py
index 798a2e1..6a71a15 100644
--- a/policyhandler/__main__.py
+++ b/policyhandler/__main__.py
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""
run as server:
@@ -23,22 +22,23 @@
that will invoke this module __main__.py in folder of policyhandler
"""
-import logging
import sys
-from policyhandler import LogWriter
from policyhandler.config import Config
from policyhandler.onap.audit import Audit
-from policyhandler.policy_receiver import PolicyReceiver
-from policyhandler.service_activator import ServiceActivator
-from policyhandler.web_server import PolicyWeb
+from policyhandler.utils import Utils
def run_policy_handler():
"""main run function for policy-handler"""
Config.init_config()
- logger = logging.getLogger("policy_handler")
+ from policyhandler import LogWriter
+ from policyhandler.policy_receiver import PolicyReceiver
+ from policyhandler.service_activator import ServiceActivator
+ from policyhandler.web_server import PolicyWeb
+
+ logger = Utils.get_logger(__file__)
sys.stdout = LogWriter(logger.info)
sys.stderr = LogWriter(logger.error)
diff --git a/policyhandler/config.py b/policyhandler/config.py
index e6e74cc..f8c425a 100644
--- a/policyhandler/config.py
+++ b/policyhandler/config.py
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""read and use the config"""
@@ -25,7 +24,7 @@ import logging.config
import os
from .onap.audit import Audit
-from .policy_utils import Utils
+from .utils import Utils
LOGS_DIR = 'logs'
@@ -40,6 +39,8 @@ logging.basicConfig(
'%(threadName)s %(name)s.%(funcName)s: %(message)s'),
datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG)
+_LOGGER = Utils.get_logger(__file__)
+
class Settings(object):
"""settings of module or an application
that is the config filtered by the collection of config-keys.
@@ -127,7 +128,6 @@ class Settings(object):
class Config(object):
"""main config of the application"""
- _logger = logging.getLogger("policy_handler.config")
CONFIG_FILE_PATH = "etc/config.json"
LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config"
SERVICE_NAME_POLICY_HANDLER = "policy_handler"
@@ -154,9 +154,11 @@ class Config(object):
DEFAULT_TIMEOUT_IN_SECS = 60
SERVICE_ACTIVATOR = "service_activator"
MODE_OF_OPERATION = "mode_of_operation"
+ PDP_API_VERSION = "PDP_API_VERSION"
system_name = SERVICE_NAME_POLICY_HANDLER
wservice_port = 25577
+ _pdp_api_version = os.environ.get(PDP_API_VERSION)
consul_url = "http://consul:8500"
consul_timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
tls_cacert_file = None
@@ -175,7 +177,7 @@ class Config(object):
return None
tls_file_path = os.path.join(cert_directory, file_name)
if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK):
- Config._logger.error("invalid %s: %s", tls_name, tls_file_path)
+ _LOGGER.error("invalid %s: %s", tls_name, tls_file_path)
return None
return tls_file_path
@@ -189,19 +191,19 @@ class Config(object):
Config.tls_server_ca_chain_file = None
if not (tls_config and isinstance(tls_config, dict)):
- Config._logger.info("no tls in config: %s", json.dumps(tls_config))
+ _LOGGER.info("no tls in config: %s", json.dumps(tls_config))
return
cert_directory = tls_config.get("cert_directory")
if not (cert_directory and isinstance(cert_directory, str)):
- Config._logger.warning("unexpected tls.cert_directory: %r", cert_directory)
+ _LOGGER.warning("unexpected tls.cert_directory: %r", cert_directory)
return
cert_directory = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), cert_directory)
if not (cert_directory and os.path.isdir(cert_directory)):
- Config._logger.warning("ignoring invalid cert_directory: %s", cert_directory)
+ _LOGGER.warning("ignoring invalid cert_directory: %s", cert_directory)
return
Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert")
@@ -213,16 +215,16 @@ class Config(object):
"server_ca_chain")
finally:
- Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file)
- Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file)
- Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file)
- Config._logger.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file)
+ _LOGGER.info("tls_cacert_file = %s", Config.tls_cacert_file)
+ _LOGGER.info("tls_server_cert_file = %s", Config.tls_server_cert_file)
+ _LOGGER.info("tls_private_key_file = %s", Config.tls_private_key_file)
+ _LOGGER.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file)
@staticmethod
def init_config(file_path=None):
"""read and store the config from config file"""
if Config._local_config.is_loaded():
- Config._logger.info("config already inited: %s", Config._local_config)
+ _LOGGER.info("config already inited: %s", Config._local_config)
return
if not file_path:
@@ -234,10 +236,10 @@ class Config(object):
loaded_config = json.load(config_json)
if not loaded_config:
- Config._logger.warning("config not loaded from file: %s", file_path)
+ _LOGGER.warning("config not loaded from file: %s", file_path)
return
- Config._logger.info("config loaded from file: %s", file_path)
+ _LOGGER.info("config loaded from file(%s): %s", file_path, Audit.json_dumps(loaded_config))
logging_config = loaded_config.get("logging")
if logging_config:
logging.config.dictConfig(logging_config)
@@ -249,13 +251,15 @@ class Config(object):
if not Config.consul_timeout_in_secs or Config.consul_timeout_in_secs < 1:
Config.consul_timeout_in_secs = Config.DEFAULT_TIMEOUT_IN_SECS
+ Config._pdp_api_version = os.environ.get(
+ Config.PDP_API_VERSION, loaded_config.get(Config.PDP_API_VERSION.lower()))
+
local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {})
Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name)
Config._set_tls_config(local_config.get(Config.FIELD_TLS))
Config._local_config.set_config(local_config, auto_commit=True)
- Config._logger.info("config loaded from file(%s): %s", file_path, Config._local_config)
@staticmethod
def discover(audit):
@@ -265,14 +269,14 @@ class Config(object):
new_config = DiscoveryClient.get_value(audit, discovery_key)
if not new_config or not isinstance(new_config, dict):
- Config._logger.warning("unexpected config from discovery: %s", new_config)
+ _LOGGER.warning("unexpected config from discovery: %s", new_config)
return
- Config._logger.debug("loaded config from discovery(%s): %s",
- discovery_key, Audit.json_dumps(new_config))
+ _LOGGER.debug("loaded config from discovery(%s): %s",
+ discovery_key, Audit.json_dumps(new_config))
Config.discovered_config.set_config(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
- Config._logger.info("config from discovery: %s", Config.discovered_config)
+ _LOGGER.info("config from discovery: %s", Config.discovered_config)
@staticmethod
@@ -302,3 +306,11 @@ class Config(object):
def get_requests_kwargs(tls_ca_mode=None):
"""generate kwargs with verify for requests based on the tls_ca_mode"""
return {Config.REQUESTS_VERIFY: Config.get_tls_verify(tls_ca_mode)}
+
+ @staticmethod
+ def is_pdp_api_default(log_status=True):
+ """whether to use the old (2018) or the default pdp API (started in 2019)"""
+ is_default = (Config._pdp_api_version is None)
+ if log_status:
+ _LOGGER.info("_pdp_api_version(%s) default(%s)", Config._pdp_api_version, is_default)
+ return is_default
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 0ffacba..a127e54 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,12 +14,10 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""send policy-update notification to deployment-handler"""
import json
-import logging
from copy import copy, deepcopy
from threading import Lock
@@ -32,7 +30,9 @@ from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES,
POLICY_FILTER_MATCHES, POLICY_FILTERS,
REMOVED_POLICIES, TARGET_ENTITY)
+from .utils import Utils
+_LOGGER = Utils.get_logger(__file__)
class PolicyUpdateMessage(object):
"""class for messages to deployment-handler on policy-update"""
@@ -143,7 +143,6 @@ class PolicyUpdateMessage(object):
class DeployHandler(object):
"""calling the deployment-handler web apis"""
- _logger = logging.getLogger("policy_handler.deploy_handler")
DEFAULT_TARGET_ENTITY = "deployment_handler"
DEFAULT_TIMEOUT_IN_SECS = 60
@@ -202,7 +201,7 @@ class DeployHandler(object):
tls_ca_mode = config_dh.get(Config.TLS_CA_MODE)
DeployHandler._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
- DeployHandler._logger.info(
+ _LOGGER.info(
"dns based routing to %s: url(%s) tls_ca_mode(%s) custom_kwargs(%s)",
DeployHandler._target_entity, DeployHandler._url,
tls_ca_mode, json.dumps(DeployHandler._custom_kwargs))
@@ -221,8 +220,8 @@ class DeployHandler(object):
DeployHandler._target_entity)
DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy'
- DeployHandler._logger.info("got %s policy url(%s): %s", DeployHandler._target_entity,
- DeployHandler._url_policy, DeployHandler._settings)
+ _LOGGER.info("got %s policy url(%s): %s", DeployHandler._target_entity,
+ DeployHandler._url_policy, DeployHandler._settings)
DeployHandler._settings.commit_change()
DeployHandler._lazy_inited = bool(DeployHandler._url)
@@ -310,12 +309,12 @@ class DeployHandler(object):
json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs))
log_line = log_action + " " + log_data
- DeployHandler._logger.info(log_line)
+ _LOGGER.info(log_line)
metrics.metrics_start(log_line)
if not DeployHandler._url:
error_msg = "no url found to {0}".format(log_line)
- DeployHandler._logger.error(error_msg)
+ _LOGGER.error(error_msg)
metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
metrics.metrics(error_msg)
@@ -331,7 +330,7 @@ class DeployHandler(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = "failed to {} {}: {} {}".format(
log_action, type(ex).__name__, str(ex), log_data)
- DeployHandler._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
@@ -345,10 +344,10 @@ class DeployHandler(object):
metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
- DeployHandler._logger.error(log_line)
+ _LOGGER.error(log_line)
return
- DeployHandler._logger.info(log_line)
+ _LOGGER.info(log_line)
result = res.json() or {}
DeployHandler._server_instance_changed(result, metrics)
@@ -379,12 +378,12 @@ class DeployHandler(object):
json.dumps(headers), json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs))
log_line = log_action + " " + log_data
- DeployHandler._logger.info(log_line)
+ _LOGGER.info(log_line)
metrics.metrics_start(log_line)
if not DeployHandler._url:
error_msg = "no url found to {}".format(log_line)
- DeployHandler._logger.error(error_msg)
+ _LOGGER.error(error_msg)
metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
metrics.metrics(error_msg)
@@ -400,7 +399,7 @@ class DeployHandler(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = "failed to {} {}: {} {}".format(
log_action, type(ex).__name__, str(ex), log_data)
- DeployHandler._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
@@ -414,7 +413,7 @@ class DeployHandler(object):
metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
- DeployHandler._logger.error(log_line)
+ _LOGGER.error(log_line)
return None, None
result = res.json() or {}
@@ -424,12 +423,12 @@ class DeployHandler(object):
policy_filters = result.get(POLICY_FILTERS, {})
if not policies and not policy_filters:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
- DeployHandler._logger.warning(audit.warn(
+ _LOGGER.warning(audit.warn(
"found no deployed policies or policy-filters: {}".format(log_line),
error_code=AuditResponseCode.DATA_ERROR))
return policies, policy_filters
- DeployHandler._logger.info(log_line)
+ _LOGGER.info(log_line)
return policies, policy_filters
@staticmethod
@@ -442,6 +441,6 @@ class DeployHandler(object):
and prev_server_instance_uuid != DeployHandler._server_instance_uuid):
DeployHandler.server_instance_changed = True
- DeployHandler._logger.info(metrics.info(
+ _LOGGER.info(metrics.info(
"deployment_handler_changed: {1} != {0}"
.format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)))
diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py
index 4c5b64e..83f54ac 100644
--- a/policyhandler/discovery.py
+++ b/policyhandler/discovery.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,20 +14,20 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""client to talk to consul services and kv"""
import base64
import json
-import logging
import requests
from .config import Config
from .customize import CustomizerUser
from .onap.audit import AuditHttpCode, Metrics
+from .utils import Utils
+_LOGGER = Utils.get_logger(__file__)
class DiscoveryClient(object):
"""talking to consul at Config.consul_url
@@ -51,13 +51,12 @@ class DiscoveryClient(object):
CONSUL_ENTITY = "consul"
CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}"
CONSUL_KV_MASK = "{}/v1/kv/{}"
- _logger = logging.getLogger("policy_handler.discovery")
@staticmethod
def _discover_service(audit, service_name, service_path):
"""find the service record in consul"""
response = requests.get(service_path, timeout=Config.consul_timeout_in_secs)
- DiscoveryClient._logger.info(audit.info("response {} from {}: {}".format(
+ _LOGGER.info(audit.info("response {} from {}: {}".format(
response.status_code, service_path, response.text)))
response.raise_for_status()
@@ -75,7 +74,7 @@ class DiscoveryClient(object):
log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, service_path)
- DiscoveryClient._logger.info(metrics.metrics_start(log_line))
+ _LOGGER.info(metrics.metrics_start(log_line))
status_code = None
try:
(status_code,
@@ -86,7 +85,7 @@ class DiscoveryClient(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line,
type(ex).__name__, str(ex)))
- DiscoveryClient._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
@@ -95,15 +94,14 @@ class DiscoveryClient(object):
if not service_url:
error_code = AuditHttpCode.DATA_ERROR.value
error_msg = "failed {}/{} to {}".format(status_code, error_code, log_line)
- DiscoveryClient._logger.error(audit.error(error_msg))
+ _LOGGER.error(audit.error(error_msg))
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
return None
log_line = "response {} {}".format(status_code, log_line)
- DiscoveryClient._logger.info(audit.info("got service_url: {} after {}"
- .format(service_url, log_line)))
+ _LOGGER.info(audit.info("got service_url: {} after {}".format(service_url, log_line)))
metrics.set_http_status_code(status_code)
audit.set_http_status_code(status_code)
@@ -128,7 +126,7 @@ class DiscoveryClient(object):
log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, discovery_url)
- DiscoveryClient._logger.info(metrics.metrics_start(log_line))
+ _LOGGER.info(metrics.metrics_start(log_line))
status_code = None
try:
status_code, value = DiscoveryClient._get_value_from_kv(discovery_url)
@@ -138,7 +136,7 @@ class DiscoveryClient(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line,
type(ex).__name__, str(ex)))
- DiscoveryClient._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index db4498a..3c09c16 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""generic class to keep track of request handling
from receiving it through reponse and log all the activities
@@ -68,6 +67,7 @@ class AuditHttpCode(Enum):
PERMISSION_UNAUTHORIZED_ERROR = 401
PERMISSION_FORBIDDEN_ERROR = 403
RESPONSE_ERROR = 400
+ PAGE_NOT_FOUND_ERROR = 404
SERVER_INTERNAL_ERROR = 500
SERVICE_UNAVAILABLE_ERROR = 503
DATA_ERROR = 1030
@@ -94,7 +94,8 @@ class AuditResponseCode(Enum):
elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value,
AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]:
response_code = AuditResponseCode.PERMISSION_ERROR
- elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value:
+ elif http_status_code in [AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value,
+ AuditHttpCode.PAGE_NOT_FOUND_ERROR.value]:
response_code = AuditResponseCode.AVAILABILITY_ERROR
elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value:
response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR
@@ -125,9 +126,9 @@ class _Audit(object):
:kwargs: - put any request related params into kwargs
"""
- _service_name = ""
+ SERVICE_INSTANCE_UUID = str(uuid.uuid4())
+ service_name = ""
_service_version = ""
- _service_instance_uuid = str(uuid.uuid4())
_started = datetime.utcnow()
_key_format = re.compile(r"\W")
_logger_debug = None
@@ -144,15 +145,15 @@ class _Audit(object):
@staticmethod
def init(service_name, config_file_path):
"""init static invariants and loggers"""
- _Audit._service_name = service_name
+ _Audit.service_name = service_name
_Audit._logger_debug = CommonLogger(config_file_path, "debug", \
- instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ instanceUUID=_Audit.SERVICE_INSTANCE_UUID, serviceName=_Audit.service_name)
_Audit._logger_error = CommonLogger(config_file_path, "error", \
- instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ instanceUUID=_Audit.SERVICE_INSTANCE_UUID, serviceName=_Audit.service_name)
_Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
- instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ instanceUUID=_Audit.SERVICE_INSTANCE_UUID, serviceName=_Audit.service_name)
_Audit._logger_audit = CommonLogger(config_file_path, "audit", \
- instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+ instanceUUID=_Audit.SERVICE_INSTANCE_UUID, serviceName=_Audit.service_name)
ProcessInfo.init()
try:
_Audit._service_version = subprocess.check_output(
@@ -175,7 +176,7 @@ class _Audit(object):
:req_message: is the request message string for logging
:kwargs: - put any request related params into kwargs
"""
- self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name)
+ self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit.service_name)
self.request_id = request_id
self.req_message = req_message or ""
self.kwargs = kwargs or {}
@@ -200,9 +201,9 @@ class _Audit(object):
utcnow = datetime.utcnow()
health = {
"server" : {
- "service_name" : _Audit._service_name,
+ "service_name" : _Audit.service_name,
"service_version" : _Audit._service_version,
- "service_instance_uuid" : _Audit._service_instance_uuid
+ "service_instance_uuid" : _Audit.SERVICE_INSTANCE_UUID
},
"runtime" : {
"started" : str(_Audit._started),
@@ -214,11 +215,12 @@ class _Audit(object):
"process_memory" : ProcessInfo.process_memory()
},
"stats" : _Audit._health.dump(),
- "items" : dict((health_name, health_getter())
- for health_name, health_getter in _Audit._health_checkers.items()),
"soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages}
}
- self.info("{} health: {}".format(_Audit._service_name,
+ health.update(dict((health_name, health_getter())
+ for health_name, health_getter in _Audit._health_checkers.items())
+ )
+ self.info("{} health: {}".format(_Audit.service_name,
json.dumps(health, sort_keys=True)))
return health
@@ -226,7 +228,7 @@ class _Audit(object):
def process_info(self):
"""get the debug info on all the threads and memory"""
process_info = ProcessInfo.get_all()
- self.info("{} process_info: {}".format(_Audit._service_name, json.dumps(process_info)))
+ self.info("{} process_info: {}".format(_Audit.service_name, json.dumps(process_info)))
return process_info
diff --git a/policyhandler/pdp_api/__init__.py b/policyhandler/pdp_api/__init__.py
new file mode 100644
index 0000000..4d009ed
--- /dev/null
+++ b/policyhandler/pdp_api/__init__.py
@@ -0,0 +1,30 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""2019 http api to policy-engine https://<policy-engine>:<port>/decision/v1/ POST"""
+
+from .policy_matcher import PolicyMatcher
+from .policy_rest import PolicyRest
+from .policy_listener import PolicyListener
+from .policy_updates import PolicyUpdates
+
+def get_pdp_api_info():
+ """info on which version of pdp api is in effect"""
+ return ("folders: PolicyMatcher({}), PolicyRest({}), PolicyListener({}), PolicyUpdates({})"
+ .format(PolicyMatcher.PDP_API_FOLDER, PolicyRest.PDP_API_FOLDER,
+ PolicyListener.PDP_API_FOLDER, PolicyUpdates.PDP_API_FOLDER
+ ))
diff --git a/policyhandler/pdp_api/pdp_consts.py b/policyhandler/pdp_api/pdp_consts.py
new file mode 100644
index 0000000..2337456
--- /dev/null
+++ b/policyhandler/pdp_api/pdp_consts.py
@@ -0,0 +1,35 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+"""contants of PDP"""
+
+# fields from pdp API 2018
+POLICY_NAME = "policyName"
+POLICY_VERSION = "policyVersion"
+POLICY_CONFIG = 'config'
+
+# fields from pdp API 2019
+PDP_POLICIES = 'policies'
+PDP_PROPERTIES = 'properties'
+PDP_METADATA = 'metadata'
+PDP_POLICY_ID = 'policy-id'
+PDP_POLICY_VERSION = 'policy-version'
+
+# req to PDP
+PDP_REQ_ONAP_NAME = "ONAPName" # always "DCAE"
+PDP_REQ_ONAP_COMPONENT = "ONAPComponent"
+PDP_REQ_ONAP_INSTANCE = "ONAPInstance"
+PDP_REQ_RESOURCE = "resource"
diff --git a/policyhandler/pdp_api/policy_listener.py b/policyhandler/pdp_api/policy_listener.py
new file mode 100644
index 0000000..9fa4695
--- /dev/null
+++ b/policyhandler/pdp_api/policy_listener.py
@@ -0,0 +1,55 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""
+policy-listener communicates with policy-engine
+to receive push notifications
+on updates and removal of policies.
+
+on receiving the policy-notifications, the policy-receiver
+passes the notifications to policy-updater
+"""
+
+import os
+
+from ..utils import ToBeImplementedException, Utils
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyListener(object):
+ """listener to PolicyEngine"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+
+ def __init__(self, *_):
+ """listener to receive the policy notifications from PolicyEngine"""
+ _LOGGER.info("to_be_implemented")
+ raise ToBeImplementedException()
+
+ def reconfigure(self, _):
+ """configure and reconfigure the listener"""
+ _LOGGER.info("to_be_implemented")
+ raise ToBeImplementedException()
+
+ def run(self):
+ """listen on web-socket and pass the policy notifications to policy-updater"""
+ _LOGGER.info("to_be_implemented")
+ raise ToBeImplementedException()
+
+ def shutdown(self, _):
+ """Shutdown the policy-listener"""
+ _LOGGER.info("to_be_implemented")
+ raise ToBeImplementedException()
diff --git a/policyhandler/pdp_api/policy_matcher.py b/policyhandler/pdp_api/policy_matcher.py
new file mode 100644
index 0000000..57258c3
--- /dev/null
+++ b/policyhandler/pdp_api/policy_matcher.py
@@ -0,0 +1,25 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""policy-matcher matches the policies from deployment-handler to policies from policy-engine"""
+
+import os
+
+
+class PolicyMatcher(object):
+ """policy-matcher - static class"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
diff --git a/policyhandler/pdp_api/policy_rest.py b/policyhandler/pdp_api/policy_rest.py
new file mode 100644
index 0000000..14d9296
--- /dev/null
+++ b/policyhandler/pdp_api/policy_rest.py
@@ -0,0 +1,215 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""policy-client communicates with policy-engine thru REST API"""
+
+import copy
+import json
+import os
+import urllib.parse
+from threading import Lock
+
+import requests
+
+from ..config import Config, Settings
+from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
+ AuditResponseCode, Metrics)
+from ..utils import Utils
+from .pdp_consts import PDP_POLICIES
+from .policy_utils import PolicyUtils
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyRest(object):
+ """using the http API to policy-engine"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+ _lazy_inited = False
+ DEFAULT_TIMEOUT_IN_SECS = 60
+
+ _lock = Lock()
+ _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS)
+
+ _target_entity = None
+ _requests_session = None
+ _url = None
+ _url_pdp_decision = None
+ _headers = None
+ _custom_kwargs = {}
+ _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
+
+ @staticmethod
+ def _init():
+ """init static config"""
+ PolicyRest._custom_kwargs = {}
+ tls_ca_mode = None
+
+ if not PolicyRest._requests_session:
+ PolicyRest._requests_session = requests.Session()
+
+ changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20)
+ if changed:
+ PolicyRest._requests_session.mount(
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
+ PolicyRest._requests_session.mount(
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
+
+ _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
+ if config:
+ PolicyRest._url = config.get("url")
+ if PolicyRest._url:
+ PolicyRest._url_pdp_decision = urllib.parse.urljoin(
+ PolicyRest._url, config.get("path_decision", "/decision/v1/"))
+ PolicyRest._headers = config.get("headers", {})
+ PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
+
+ tls_ca_mode = config.get(Config.TLS_CA_MODE)
+ PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
+ PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS)
+ if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1:
+ PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS
+
+ _LOGGER.info(
+ "PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s",
+ PolicyRest._target_entity, PolicyRest._url_pdp_decision,
+ Metrics.json_dumps(PolicyRest._headers), tls_ca_mode,
+ PolicyRest._timeout_in_secs, json.dumps(PolicyRest._custom_kwargs),
+ PolicyRest._settings)
+
+ PolicyRest._settings.commit_change()
+ PolicyRest._lazy_inited = True
+
+ @staticmethod
+ def reconfigure():
+ """reconfigure"""
+ with PolicyRest._lock:
+ PolicyRest._settings.set_config(Config.discovered_config)
+ if not PolicyRest._settings.is_changed():
+ PolicyRest._settings.commit_change()
+ return False
+
+ PolicyRest._lazy_inited = False
+ PolicyRest._init()
+ return True
+
+ @staticmethod
+ def _lazy_init():
+ """init static config"""
+ if PolicyRest._lazy_inited:
+ return
+
+ with PolicyRest._lock:
+ if PolicyRest._lazy_inited:
+ return
+
+ PolicyRest._settings.set_config(Config.discovered_config)
+ PolicyRest._init()
+
+ @staticmethod
+ def _pdp_get_decision(audit, pdp_req):
+ """Communication with the policy-engine"""
+ if not PolicyRest._url:
+ _LOGGER.error(
+ audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR))
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
+
+ with PolicyRest._lock:
+ session = PolicyRest._requests_session
+ target_entity = PolicyRest._target_entity
+ url = PolicyRest._url_pdp_decision
+ timeout_in_secs = PolicyRest._timeout_in_secs
+ headers = copy.deepcopy(PolicyRest._headers)
+ custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs)
+
+ metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url)
+
+ headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
+
+ log_action = "post to {} at {}".format(target_entity, url)
+ log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format(
+ json.dumps(pdp_req), Metrics.json_dumps(headers), json.dumps(custom_kwargs),
+ timeout_in_secs)
+ log_line = log_action + " " + log_data
+
+ _LOGGER.info(metrics.metrics_start(log_line))
+
+ res = None
+ try:
+ res = session.post(url, json=pdp_req, headers=headers, timeout=timeout_in_secs,
+ **custom_kwargs)
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line))
+
+ _LOGGER.exception(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return None
+
+ log_line = "response {} from {}: text={} headers={}".format(
+ res.status_code, log_line, res.text,
+ Metrics.json_dumps(dict(res.request.headers.items())))
+
+ _LOGGER.info(log_line)
+ metrics.set_http_status_code(res.status_code)
+ audit.set_http_status_code(res.status_code)
+ metrics.metrics(log_line)
+
+ policy_bodies = None
+ if res.status_code == requests.codes.ok:
+ policy_bodies = res.json().get(PDP_POLICIES)
+
+ return policy_bodies
+
+ @staticmethod
+ def get_latest_policy(aud_policy_id):
+ """safely try retrieving the latest policy for the policy_id from the policy-engine"""
+ audit, policy_id, _, _ = aud_policy_id
+ try:
+ PolicyRest._lazy_init()
+
+ pdp_req = PolicyUtils.gen_req_to_pdp(policy_id)
+ policy_bodies = PolicyRest._pdp_get_decision(audit, pdp_req)
+
+ log_line = "looking for policy_id({}) in policy_bodies: {}".format(
+ policy_id, json.dumps(policy_bodies))
+ _LOGGER.info(log_line)
+
+ latest_policy = None
+ if policy_bodies and policy_id in policy_bodies:
+ latest_policy = PolicyUtils.convert_to_policy(policy_bodies[policy_id])
+
+ if not PolicyUtils.validate_policy(latest_policy):
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
+ _LOGGER.error(audit.error(
+ "received invalid policy from PDP: {}".format(json.dumps(latest_policy)),
+ error_code=AuditResponseCode.DATA_ERROR))
+
+ return latest_policy
+ except Exception as ex:
+ error_msg = ("{}: get_latest_policy({}) crash {}: {}"
+ .format(audit.request_id, policy_id, type(ex).__name__, str(ex)))
+
+ _LOGGER.exception(error_msg)
+ audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
diff --git a/policyhandler/pdp_api/policy_updates.py b/policyhandler/pdp_api/policy_updates.py
new file mode 100644
index 0000000..eb3c3d1
--- /dev/null
+++ b/policyhandler/pdp_api/policy_updates.py
@@ -0,0 +1,49 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""policy-updates accumulates the policy-update notifications from PDP"""
+
+import os
+
+from ..utils import Utils, ToBeImplementedException
+
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyUpdates(object):
+ """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+
+ def __init__(self):
+ """init and reset"""
+
+ def reset(self):
+ """resets the state"""
+ self.__init__()
+
+ def pop_policy_updates(self):
+ """
+ Returns the consolidated (audit, policies_updated, policies_removed)
+ and resets the state
+ """
+ _LOGGER.info("to_be_implemented")
+ return None, None, None
+
+ def push_policy_updates(self, *_):
+ """consolidate the new policies_updated, policies_removed to existing ones"""
+ _LOGGER.info("to_be_implemented")
+ raise ToBeImplementedException()
diff --git a/policyhandler/pdp_api/policy_utils.py b/policyhandler/pdp_api/policy_utils.py
new file mode 100644
index 0000000..1d06d14
--- /dev/null
+++ b/policyhandler/pdp_api/policy_utils.py
@@ -0,0 +1,123 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""utils for policy usage and conversions"""
+
+from ..onap.audit import Audit
+from ..policy_consts import POLICY_BODY, POLICY_ID
+from .pdp_consts import (PDP_METADATA, PDP_POLICY_ID,
+ PDP_POLICY_VERSION, PDP_PROPERTIES,
+ PDP_REQ_ONAP_COMPONENT, PDP_REQ_ONAP_INSTANCE,
+ PDP_REQ_ONAP_NAME, PDP_REQ_RESOURCE, POLICY_CONFIG,
+ POLICY_NAME, POLICY_VERSION)
+
+
+class PolicyUtils(object):
+ """policy-client utils"""
+
+ @staticmethod
+ def gen_req_to_pdp(policy_id):
+ """request to get a single policy from pdp by policy_id"""
+ return {
+ PDP_REQ_ONAP_NAME: "DCAE",
+ PDP_REQ_ONAP_COMPONENT: Audit.service_name,
+ PDP_REQ_ONAP_INSTANCE: Audit.SERVICE_INSTANCE_UUID,
+ "action": "configure",
+ PDP_REQ_RESOURCE: {PDP_POLICY_ID: [policy_id]}
+ }
+
+ @staticmethod
+ def convert_to_policy(policy_body):
+ """
+ set policy id, name, version, config=properties and
+ wrap policy_body received from policy-engine with policy_id
+
+ input:
+ {
+ "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+ "version": "1.0.0",
+ "metadata": {
+ "policy-id": "onap.scaleout.tca",
+ "policy-version": 1,
+ "description": "The scaleout policy for vDNS"
+ },
+ "properties": {
+ "tca_policy": {
+ "domain": "measurementsForVfScaling",
+ "metricsPerEventName": [
+ {
+ "eventName": "vLoadBalancer",
+ "controlLoopSchemaType": "VNF",
+ "policyScope": "type=configuration"
+ }
+ ]
+ }
+ }
+ }
+
+ output:
+ {
+ "policy_id": "onap.scaleout.tca",
+ "policy_body": {
+ "policyName": "onap.scaleout.tca.1.xml",
+ "policyVersion": 1,
+ "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+ "version": "1.0.0",
+ "metadata": {
+ "policy-id": "onap.scaleout.tca",
+ "policy-version": 1,
+ "description": "The scaleout policy for vDNS"
+ },
+ "config": {
+ "tca_policy": {
+ "domain": "measurementsForVfScaling",
+ "metricsPerEventName": [
+ {
+ "eventName": "vLoadBalancer",
+ "controlLoopSchemaType": "VNF",
+ "policyScope": "type=configuration"
+ }
+ ]
+ }
+ }
+ }
+ }
+ """
+ if not policy_body or not policy_body.get(PDP_PROPERTIES):
+ return None
+
+ pdp_metadata = policy_body.get(PDP_METADATA, {})
+ policy_id = pdp_metadata.get(PDP_POLICY_ID)
+ policy_version = pdp_metadata.get(PDP_POLICY_VERSION)
+ if not policy_id or not policy_version:
+ return None
+
+ policy_body[POLICY_NAME] = "{}.{}.xml".format(policy_id, policy_version)
+ policy_body[POLICY_VERSION] = str(policy_version)
+ policy_body[POLICY_CONFIG] = policy_body[PDP_PROPERTIES]
+ del policy_body[PDP_PROPERTIES]
+
+ return {POLICY_ID:policy_id, POLICY_BODY:policy_body}
+
+ @staticmethod
+ def validate_policy(policy):
+ """validate have non-empty config in policy"""
+ if not policy:
+ return False
+
+ policy_body = policy.get(POLICY_BODY)
+ return bool(policy_body and policy_body.get(POLICY_CONFIG))
diff --git a/policyhandler/pdp_api_v0/__init__.py b/policyhandler/pdp_api_v0/__init__.py
new file mode 100644
index 0000000..0196508
--- /dev/null
+++ b/policyhandler/pdp_api_v0/__init__.py
@@ -0,0 +1,30 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""<=2018 http api to policy-engine /getConfig that is going to be replaced in 2019"""
+
+from .policy_matcher import PolicyMatcher
+from .policy_rest import PolicyRest
+from .policy_listener import PolicyListener
+from .policy_updates import PolicyUpdates
+
+def get_pdp_api_info():
+ """info on which version of pdp api is in effect"""
+ return ("folders: PolicyMatcher({}), PolicyRest({}), PolicyListener({}), PolicyUpdates({})"
+ .format(PolicyMatcher.PDP_API_FOLDER, PolicyRest.PDP_API_FOLDER,
+ PolicyListener.PDP_API_FOLDER, PolicyUpdates.PDP_API_FOLDER
+ ))
diff --git a/policyhandler/pdp_api_v0/pdp_consts.py b/policyhandler/pdp_api_v0/pdp_consts.py
new file mode 100644
index 0000000..d1c0b44
--- /dev/null
+++ b/policyhandler/pdp_api_v0/pdp_consts.py
@@ -0,0 +1,23 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""contants of PDP"""
+
+POLICY_VERSION = "policyVersion"
+POLICY_NAME = "policyName"
+POLICY_CONFIG = 'config'
+MATCHING_CONDITIONS = "matchingConditions"
diff --git a/policyhandler/pdp_api_v0/policy_listener.py b/policyhandler/pdp_api_v0/policy_listener.py
new file mode 100644
index 0000000..67e4c49
--- /dev/null
+++ b/policyhandler/pdp_api_v0/policy_listener.py
@@ -0,0 +1,309 @@
+# ================================================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""
+policy-listener communicates with policy-engine
+thru web-socket to receive push notifications
+on updates and removal of policies.
+
+on receiving the policy-notifications, the policy-receiver
+passes the notifications to policy-updater
+"""
+
+import copy
+import json
+import os
+import ssl
+import time
+import urllib.parse
+from datetime import datetime
+from threading import Lock, Thread
+
+import websocket
+
+from ..config import Config, Settings
+from ..onap.audit import Audit
+from ..utils import Utils
+from .pdp_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
+
+LOADED_POLICIES = 'loadedPolicies'
+REMOVED_POLICIES = 'removedPolicies'
+POLICY_VER = 'versionNo'
+POLICY_MATCHES = 'matches'
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyListener(Thread):
+ """web-socket to PolicyEngine"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+ WS_STARTED = "started"
+ WS_START_COUNT = "start_count"
+ WS_CLOSE_COUNT = "close_count"
+ WS_ERROR_COUNT = "error_count"
+ WS_PONG_COUNT = "pong_count"
+ WS_MESSAGE_COUNT = "message_count"
+ WS_MESSAGE_TIMESTAMP = "message_timestamp"
+ WS_STATUS = "status"
+ WS_PING_INTERVAL_DEFAULT = 30
+ WEB_SOCKET_HEALTH = "web_socket_health"
+
+ def __init__(self, audit, policy_updater):
+ """web-socket inside the thread to receive policy notifications from PolicyEngine"""
+ Thread.__init__(self, name="policy_receiver", daemon=True)
+
+ self._policy_updater = policy_updater
+ self._lock = Lock()
+ self._keep_running = True
+ self._settings = Settings(Config.FIELD_POLICY_ENGINE)
+
+ self._sleep_before_restarting = 5
+ self._web_socket_url = None
+ self._web_socket_sslopt = None
+ self._tls_wss_ca_mode = None
+ self._web_socket = None
+ self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT
+ self._web_socket_health = {
+ PolicyListener.WS_START_COUNT: 0,
+ PolicyListener.WS_CLOSE_COUNT: 0,
+ PolicyListener.WS_ERROR_COUNT: 0,
+ PolicyListener.WS_PONG_COUNT: 0,
+ PolicyListener.WS_MESSAGE_COUNT: 0,
+ PolicyListener.WS_STATUS: "created"
+ }
+
+ Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH, self._get_health)
+ self.reconfigure(audit)
+
+ def reconfigure(self, audit):
+ """configure and reconfigure the web-socket"""
+ with self._lock:
+ _LOGGER.info(audit.info("web_socket_health {}".format(
+ json.dumps(self._get_health(), sort_keys=True))))
+ self._sleep_before_restarting = 5
+ self._settings.set_config(Config.discovered_config)
+ changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
+
+ if not changed:
+ self._settings.commit_change()
+ return False
+
+ prev_web_socket_url = self._web_socket_url
+ prev_web_socket_sslopt = self._web_socket_sslopt
+ prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
+
+ self._web_socket_sslopt = None
+
+ resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/",
+ config.get("path_notifications", "/pdp/notifications"))
+
+ self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE)
+
+ self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS)
+ if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60:
+ self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT
+
+ if resturl.startswith("https:"):
+ self._web_socket_url = resturl.replace("https:", "wss:")
+
+ verify = Config.get_tls_verify(self._tls_wss_ca_mode)
+ if verify is False:
+ self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE}
+ elif verify is True:
+ pass
+ else:
+ self._web_socket_sslopt = {'ca_certs': verify}
+
+ else:
+ self._web_socket_url = resturl.replace("http:", "ws:")
+
+ log_changed = (
+ "changed web_socket_url(%s) or tls_wss_ca_mode(%s)"
+ " or ws_ping_interval_in_secs(%s): %s" %
+ (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs,
+ self._settings))
+ if (self._web_socket_url == prev_web_socket_url
+ and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)
+ and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs):
+ _LOGGER.info(audit.info("not {}".format(log_changed)))
+ self._settings.commit_change()
+ return False
+
+ _LOGGER.info(audit.info(log_changed))
+ self._settings.commit_change()
+
+ self._stop_notifications()
+ return True
+
+ def run(self):
+ """listen on web-socket and pass the policy notifications to policy-updater"""
+ _LOGGER.info("starting policy_receiver...")
+ websocket.enableTrace(True)
+ restarting = False
+ while True:
+ if not self._get_keep_running():
+ break
+
+ self._stop_notifications()
+
+ if restarting:
+ with self._lock:
+ sleep_before_restarting = self._sleep_before_restarting
+ _LOGGER.info(
+ "going to sleep for %s secs before restarting policy-notifications",
+ sleep_before_restarting)
+
+ time.sleep(sleep_before_restarting)
+ if not self._get_keep_running():
+ break
+
+ with self._lock:
+ web_socket_url = self._web_socket_url
+ sslopt = copy.deepcopy(self._web_socket_sslopt)
+ tls_wss_ca_mode = self._tls_wss_ca_mode
+ ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
+
+ _LOGGER.info(
+ "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)"
+ " ws_ping_interval_in_secs(%s)",
+ web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs)
+
+ self._web_socket = websocket.WebSocketApp(
+ web_socket_url,
+ on_open=self._on_ws_open,
+ on_message=self._on_pdp_message,
+ on_close=self._on_ws_close,
+ on_error=self._on_ws_error,
+ on_pong=self._on_ws_pong
+ )
+
+ _LOGGER.info("waiting for policy-notifications...")
+ self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs)
+ restarting = True
+
+ Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH)
+ _LOGGER.info("exit policy-receiver")
+
+ def _get_keep_running(self):
+ """thread-safe check whether to continue running"""
+ with self._lock:
+ keep_running = self._keep_running
+ return keep_running
+
+ def _stop_notifications(self):
+ """close the web-socket == stops the notification service if running."""
+ with self._lock:
+ if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
+ self._web_socket.close()
+ _LOGGER.info("stopped receiving notifications from PDP")
+
+ def _on_pdp_message(self, *args):
+ """received the notification from PDP"""
+ self._web_socket_health[PolicyListener.WS_MESSAGE_COUNT] += 1
+ self._web_socket_health[PolicyListener.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow())
+ try:
+ message = args and args[-1]
+ _LOGGER.info("Received notification message: %s", message)
+ _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True))
+ if not message:
+ return
+ message = json.loads(message)
+
+ if not message or not isinstance(message, dict):
+ _LOGGER.warning("unexpected message from PDP: %s", json.dumps(message))
+ return
+
+ policies_updated = [
+ {POLICY_NAME: policy.get(POLICY_NAME),
+ POLICY_VERSION: policy.get(POLICY_VER),
+ MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})}
+ for policy in message.get(LOADED_POLICIES, [])
+ ]
+
+ policies_removed = [
+ {POLICY_NAME: removed_policy.get(POLICY_NAME),
+ POLICY_VERSION: removed_policy.get(POLICY_VER)}
+ for removed_policy in message.get(REMOVED_POLICIES, [])
+ ]
+
+ if not policies_updated and not policies_removed:
+ _LOGGER.info("no policy updated or removed")
+ return
+
+ self._policy_updater.policy_update(policies_updated, policies_removed)
+ except Exception as ex:
+ error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+ "on_pdp_message", json.dumps(message))
+
+ _LOGGER.exception(error_msg)
+
+ def _on_ws_error(self, error):
+ """report an error"""
+ _LOGGER.exception("policy-notification error %s", str(error))
+ self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5
+
+ self._web_socket_health[PolicyListener.WS_STATUS] = "error"
+ self._web_socket_health[PolicyListener.WS_ERROR_COUNT] += 1
+ self._web_socket_health["last_error"] = {
+ "error": str(error), "timestamp": str(datetime.utcnow())
+ }
+ _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_close(self, code, reason):
+ """restart web-socket on close"""
+ self._web_socket_health["last_closed"] = str(datetime.utcnow())
+ self._web_socket_health[PolicyListener.WS_STATUS] = "closed"
+ self._web_socket_health[PolicyListener.WS_CLOSE_COUNT] += 1
+ _LOGGER.info(
+ "lost connection(%s, %s) to PDP web_socket_health %s",
+ code, reason, json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_open(self):
+ """started web-socket"""
+ self._web_socket_health[PolicyListener.WS_STATUS] = PolicyListener.WS_STARTED
+ self._web_socket_health[PolicyListener.WS_START_COUNT] += 1
+ self._web_socket_health[PolicyListener.WS_STARTED] = datetime.utcnow()
+ _LOGGER.info("opened connection to PDP web_socket_health %s",
+ json.dumps(self._get_health(), sort_keys=True))
+
+ def _on_ws_pong(self, pong):
+ """pong = response to pinging the server of the web-socket"""
+ self._web_socket_health[PolicyListener.WS_PONG_COUNT] += 1
+ _LOGGER.info(
+ "pong(%s) from connection to PDP web_socket_health %s",
+ pong, json.dumps(self._get_health(), sort_keys=True))
+
+ def _get_health(self):
+ """returns the healthcheck of the web-socket as json"""
+ web_socket_health = copy.deepcopy(self._web_socket_health)
+ web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs
+ started = web_socket_health.get(PolicyListener.WS_STARTED)
+ if started:
+ web_socket_health[PolicyListener.WS_STARTED] = str(started)
+ web_socket_health["uptime"] = str(datetime.utcnow() - started)
+ return web_socket_health
+
+
+ def shutdown(self, audit):
+ """Shutdown the policy-listener"""
+ _LOGGER.info(audit.info("shutdown policy-listener"))
+ with self._lock:
+ self._keep_running = False
+
+ self._stop_notifications()
+
+ if self.is_alive():
+ self.join()
diff --git a/policyhandler/policy_matcher.py b/policyhandler/pdp_api_v0/policy_matcher.py
index d0786ba..357af49 100644
--- a/policyhandler/policy_matcher.py
+++ b/policyhandler/pdp_api_v0/policy_matcher.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,27 +14,28 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""policy-matcher matches the policies from deployment-handler to policies from policy-engine"""
import json
-import logging
+import os
import re
-from .deploy_handler import DeployHandler, PolicyUpdateMessage
-from .onap.audit import AuditHttpCode, AuditResponseCode
-from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES,
- MATCHING_CONDITIONS, POLICY_BODY, POLICY_FILTER,
- POLICY_NAME, POLICY_VERSION, POLICY_VERSIONS)
+from ..deploy_handler import DeployHandler, PolicyUpdateMessage
+from ..onap.audit import AuditHttpCode, AuditResponseCode
+from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
+ POLICY_FILTER, POLICY_VERSIONS)
+from ..utils import RegexCoarser, Utils
+from .pdp_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
from .policy_rest import PolicyRest
-from .policy_utils import RegexCoarser
+_LOGGER = Utils.get_logger(__file__)
+
class PolicyMatcher(object):
"""policy-matcher - static class"""
- _logger = logging.getLogger("policy_handler.policy_matcher")
PENDING_UPDATE = "pending_update"
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
@staticmethod
def get_deployed_policies(audit):
@@ -43,12 +44,12 @@ class PolicyMatcher(object):
if audit.is_not_found():
warning_txt = "got no deployed policies or policy-filters"
- PolicyMatcher._logger.warning(warning_txt)
+ _LOGGER.warning(warning_txt)
return {"warning": warning_txt}, None, None
if not audit.is_success() or (not deployed_policies and not deployed_policy_filters):
error_txt = "failed to retrieve policies from deployment-handler"
- PolicyMatcher._logger.error(error_txt)
+ _LOGGER.error(error_txt)
return {"error": error_txt}, None, None
return None, deployed_policies, deployed_policy_filters
@@ -62,7 +63,7 @@ class PolicyMatcher(object):
if not (deployed_policies or deployed_policy_filters):
error_txt = "no deployed policies or policy-filters"
- PolicyMatcher._logger.warning(error_txt)
+ _LOGGER.warning(error_txt)
return {"error": error_txt}, None
coarse_regex_patterns = PolicyMatcher.calc_coarse_patterns(
@@ -72,7 +73,7 @@ class PolicyMatcher(object):
error_txt = ("failed to construct the coarse_regex_patterns from " +
"deployed_policies: {} and deployed_policy_filters: {}"
.format(deployed_policies, deployed_policy_filters))
- PolicyMatcher._logger.error(audit.error(
+ _LOGGER.error(audit.error(
error_txt, error_code=AuditResponseCode.DATA_ERROR))
audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value)
return {"error": error_txt}, None
@@ -84,7 +85,7 @@ class PolicyMatcher(object):
if not audit.is_success():
error_txt = "failed to retrieve policies from policy-engine"
- PolicyMatcher._logger.warning(error_txt)
+ _LOGGER.warning(error_txt)
return {"error": error_txt}, None
latest_policies = pdp_response.get(LATEST_POLICIES, {})
@@ -123,7 +124,7 @@ class PolicyMatcher(object):
coarse_regex.add_regex_pattern(policy_name_pattern)
coarse_regex_patterns = coarse_regex.get_coarse_regex_patterns()
- PolicyMatcher._logger.debug(
+ _LOGGER.debug(
audit.debug("coarse_regex_patterns({}) combined_regex_pattern({}) for patterns({})"
.format(coarse_regex_patterns,
coarse_regex.get_combined_regex_pattern(),
@@ -219,11 +220,11 @@ class PolicyMatcher(object):
log_line = "policy {} to filter id {}: {}".format(json.dumps(policy),
policy_filter_id,
json.dumps(policy_filter))
- # PolicyMatcher._logger.debug(audit.debug("matching {}...".format(log_line)))
+ # _LOGGER.debug(audit.debug("matching {}...".format(log_line)))
if (filter_policy_name != policy_id and filter_policy_name != policy_name
and not re.match(filter_policy_name, policy_name)):
- PolicyMatcher._logger.debug(
+ _LOGGER.debug(
audit.debug("not match by policyName: {} != {}: {}"
.format(policy_name, filter_policy_name, log_line)))
return False
@@ -235,7 +236,7 @@ class PolicyMatcher(object):
filter_onap_name = policy_filter.get("onapName")
policy_onap_name = matching_conditions.get("ONAPName")
if filter_onap_name and filter_onap_name != policy_onap_name:
- PolicyMatcher._logger.debug(
+ _LOGGER.debug(
audit.debug("not match by ONAPName: {} != {}: {}"
.format(policy_onap_name, filter_onap_name, log_line)))
return False
@@ -243,7 +244,7 @@ class PolicyMatcher(object):
filter_config_name = policy_filter.get("configName")
policy_config_name = matching_conditions.get("ConfigName")
if filter_config_name and filter_config_name != policy_config_name:
- PolicyMatcher._logger.debug(
+ _LOGGER.debug(
audit.debug("not match by configName: {} != {}: {}"
.format(policy_config_name, filter_config_name, log_line)))
return False
@@ -253,12 +254,12 @@ class PolicyMatcher(object):
for filter_key, filter_config_attribute in filter_config_attributes.items():
if (filter_key not in matching_conditions
or filter_config_attribute != matching_conditions.get(filter_key)):
- PolicyMatcher._logger.debug(
+ _LOGGER.debug(
audit.debug("not match by configAttributes: {} != {}: {}"
.format(json.dumps(matching_conditions),
json.dumps(filter_config_attributes),
log_line)))
return False
- PolicyMatcher._logger.debug(audit.debug("matched {}".format(log_line)))
+ _LOGGER.debug(audit.debug("matched {}".format(log_line)))
return True
diff --git a/policyhandler/policy_rest.py b/policyhandler/pdp_api_v0/policy_rest.py
index 85dd914..c59625e 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/pdp_api_v0/policy_rest.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,13 +14,12 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""policy-client communicates with policy-engine thru REST API"""
import copy
import json
-import logging
+import os
import time
import urllib.parse
from multiprocessing.dummy import Pool as ThreadPool
@@ -28,18 +27,21 @@ from threading import Lock
import requests
-from .config import Config, Settings
-from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
- AuditResponseCode, Metrics)
-from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
- POLICY_CONFIG, POLICY_FILTER, POLICY_FILTERS,
- POLICY_ID, POLICY_NAME)
+from ..config import Config, Settings
+from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
+ AuditResponseCode, Metrics)
+from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
+ POLICY_FILTER, POLICY_FILTERS, POLICY_ID,
+ POLICY_NAMES)
+from ..utils import Utils
+from .pdp_consts import POLICY_CONFIG, POLICY_NAME, POLICY_VERSION
from .policy_utils import PolicyUtils
+_LOGGER = Utils.get_logger(__file__)
class PolicyRest(object):
"""using the http API to policy-engine"""
- _logger = logging.getLogger("policy_handler.policy_rest")
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
_lazy_inited = False
POLICY_GET_CONFIG = 'getConfig'
PDP_CONFIG_STATUS = "policyConfigStatus"
@@ -60,6 +62,7 @@ class PolicyRest(object):
Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP)
_requests_session = None
+ _url = None
_url_get_config = None
_headers = None
_target_entity = None
@@ -73,8 +76,7 @@ class PolicyRest(object):
def _init():
"""init static config"""
PolicyRest._custom_kwargs = {}
-
- _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
+ tls_ca_mode = None
if not PolicyRest._requests_session:
PolicyRest._requests_session = requests.Session()
@@ -88,28 +90,33 @@ class PolicyRest(object):
'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
pool_maxsize=pool_size))
- get_config_path = urllib.parse.urljoin(
- config.get("path_api", "pdp/api").strip("/") + "/", PolicyRest.POLICY_GET_CONFIG)
- PolicyRest._url_get_config = urllib.parse.urljoin(config.get("url", ""), get_config_path)
- PolicyRest._headers = config.get("headers", {})
- PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
- _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key(
- Config.THREAD_POOL_SIZE, 4)
- if PolicyRest._thread_pool_size < 2:
- PolicyRest._thread_pool_size = 2
-
- _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key(
- Config.POLICY_RETRY_COUNT, 1)
- _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key(
- Config.POLICY_RETRY_SLEEP, 0)
-
- tls_ca_mode = config.get(Config.TLS_CA_MODE)
- PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
- PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS)
- if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1:
- PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS
-
- PolicyRest._logger.info(
+ _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
+ if config:
+ PolicyRest._url = config.get("url")
+ if PolicyRest._url:
+ path_get_config = urllib.parse.urljoin(
+ config.get("path_api", "pdp/api").strip("/")
+ + "/", PolicyRest.POLICY_GET_CONFIG)
+ PolicyRest._url_get_config = urllib.parse.urljoin(PolicyRest._url, path_get_config)
+ PolicyRest._headers = config.get("headers", {})
+ PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
+ _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key(
+ Config.THREAD_POOL_SIZE, 4)
+ if PolicyRest._thread_pool_size < 2:
+ PolicyRest._thread_pool_size = 2
+
+ _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_COUNT, 1)
+ _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key(
+ Config.POLICY_RETRY_SLEEP, 0)
+
+ tls_ca_mode = config.get(Config.TLS_CA_MODE)
+ PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
+ PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS)
+ if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1:
+ PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS
+
+ _LOGGER.info(
"PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s",
PolicyRest._target_entity, PolicyRest._url_get_config,
Metrics.json_dumps(PolicyRest._headers), tls_ca_mode,
@@ -148,6 +155,12 @@ class PolicyRest(object):
@staticmethod
def _pdp_get_config(audit, json_body):
"""Communication with the policy-engine"""
+ if not PolicyRest._url:
+ _LOGGER.error(
+ audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR))
+ audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ return None
+
with PolicyRest._lock:
session = PolicyRest._requests_session
target_entity = PolicyRest._target_entity
@@ -166,7 +179,7 @@ class PolicyRest(object):
timeout_in_secs)
log_line = log_action + " " + log_data
- PolicyRest._logger.info(metrics.metrics_start(log_line))
+ _LOGGER.info(metrics.metrics_start(log_line))
res = None
try:
@@ -178,7 +191,7 @@ class PolicyRest(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line))
- PolicyRest._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
@@ -195,7 +208,7 @@ class PolicyRest(object):
metrics.set_http_status_code(res.status_code)
metrics.metrics(log_line)
- PolicyRest._logger.info(log_line)
+ _LOGGER.info(log_line)
return res.status_code, res_data
@staticmethod
@@ -213,7 +226,7 @@ class PolicyRest(object):
error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
error_msg = "{} unexpected {}".format(error_code, log_line)
- PolicyRest._logger.error(error_msg)
+ _LOGGER.error(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
@@ -236,7 +249,7 @@ class PolicyRest(object):
status_code = AuditHttpCode.DATA_NOT_FOUND_OK.value
info_msg = "{} not found {}".format(status_code, log_line)
- PolicyRest._logger.info(info_msg)
+ _LOGGER.info(info_msg)
metrics.set_http_status_code(status_code)
metrics.metrics(info_msg)
return status_code, None
@@ -273,7 +286,7 @@ class PolicyRest(object):
.format(audit.request_id, type(ex).__name__, str(ex),
"get_latest_policy", str_metrics))
- PolicyRest._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None
@@ -290,8 +303,7 @@ class PolicyRest(object):
expect_policy_removed = (ignore_policy_names and not expected_versions)
for retry in range(1, PolicyRest._policy_retry_count + 1):
- PolicyRest._logger.debug("try(%s) retry_get_config(%s): %s",
- retry, retry_get_config, str_metrics)
+ _LOGGER.debug("try(%s) retry_get_config(%s): %s", retry, retry_get_config, str_metrics)
done, latest_policy, status_code = PolicyRest._get_latest_policy_once(
audit, policy_id, expected_versions, ignore_policy_names,
@@ -301,13 +313,13 @@ class PolicyRest(object):
break
if retry == PolicyRest._policy_retry_count:
- PolicyRest._logger.error(
+ _LOGGER.error(
audit.error("gave up retrying after #{} for policy_id({}) from PDP {}"
.format(retry, policy_id, PolicyRest._url_get_config),
error_code=AuditResponseCode.DATA_ERROR))
break
- PolicyRest._logger.warning(audit.warn(
+ _LOGGER.warning(audit.warn(
"will retry({}) for policy_id({}) in {} secs from PDP {}".format(
retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_get_config),
error_code=AuditResponseCode.DATA_ERROR))
@@ -321,7 +333,7 @@ class PolicyRest(object):
audit.set_http_status_code(status_code)
if not PolicyRest._validate_policy(latest_policy):
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
- PolicyRest._logger.error(audit.error(
+ _LOGGER.error(audit.error(
"received invalid policy from PDP: {}".format(json.dumps(latest_policy)),
error_code=AuditResponseCode.DATA_ERROR))
@@ -335,15 +347,15 @@ class PolicyRest(object):
status_code, policy_bodies = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
- PolicyRest._logger.debug("%s %s policy_bodies: %s",
- status_code, policy_id, json.dumps(policy_bodies or []))
+ _LOGGER.debug("%s %s policy_bodies: %s",
+ status_code, policy_id, json.dumps(policy_bodies or []))
latest_policy = PolicyUtils.select_latest_policy(
policy_bodies, expected_versions, ignore_policy_names
)
if not latest_policy and not expect_policy_removed:
- PolicyRest._logger.error(
+ _LOGGER.error(
audit.error("received unexpected policy data from PDP for policy_id={}: {}"
.format(policy_id, json.dumps(policy_bodies or [])),
error_code=AuditResponseCode.DATA_ERROR))
@@ -355,9 +367,16 @@ class PolicyRest(object):
return done, latest_policy, status_code
@staticmethod
- def get_latest_updated_policies(aud_policy_updates):
+ def get_latest_updated_policies(audit, updated_policies, removed_policies):
"""safely try retrieving the latest policies for the list of policy_names"""
- audit, policies_updated, policies_removed = aud_policy_updates
+ if not updated_policies and not removed_policies:
+ return None, None
+
+ policies_updated = [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION))
+ for policy_id, policy in updated_policies.items()]
+ policies_removed = [(policy_id, policy.get(POLICY_NAMES, {}))
+ for policy_id, policy in removed_policies.items()]
+
if not policies_updated and not policies_removed:
return None, None
@@ -374,7 +393,7 @@ class PolicyRest(object):
.format(audit.request_id, type(ex).__name__, str(ex),
"get_latest_updated_policies", str_metrics))
- PolicyRest._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None, None
@@ -389,7 +408,7 @@ class PolicyRest(object):
targetServiceName=PolicyRest._url_get_config)
metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
- PolicyRest._logger.debug(str_metrics)
+ _LOGGER.debug(str_metrics)
policies_to_find = {}
for (policy_id, policy_version) in policies_updated:
@@ -424,8 +443,8 @@ class PolicyRest(object):
policies = None
apns_length = len(apns)
- PolicyRest._logger.debug("apns_length(%s) policies_to_find %s", apns_length,
- json.dumps(policies_to_find))
+ _LOGGER.debug("apns_length(%s) policies_to_find %s", apns_length,
+ json.dumps(policies_to_find))
if apns_length == 1:
policies = [PolicyRest.get_latest_policy(apns[0])]
@@ -454,7 +473,7 @@ class PolicyRest(object):
if policy_id not in updated_policies
and policy_id not in removed_policies)
- PolicyRest._logger.debug(
+ _LOGGER.debug(
"result(%s) updated_policies %s, removed_policies %s, errored_policies %s",
apns_length, json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(errored_policies))
@@ -474,19 +493,19 @@ class PolicyRest(object):
audit, policy_filter = aud_policy_filter
try:
str_policy_filter = json.dumps(policy_filter)
- PolicyRest._logger.debug("%s", str_policy_filter)
+ _LOGGER.debug("%s", str_policy_filter)
status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter)
audit.set_http_status_code(status_code)
- PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code,
- str_policy_filter, json.dumps(policy_bodies or []))
+ _LOGGER.debug("%s policy_bodies: %s %s", status_code,
+ str_policy_filter, json.dumps(policy_bodies or []))
latest_policies = PolicyUtils.select_latest_policies(policy_bodies)
if not latest_policies:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value)
- PolicyRest._logger.warning(audit.warn(
+ _LOGGER.warning(audit.warn(
"received no policies from PDP for policy_filter {}: {}"
.format(str_policy_filter, json.dumps(policy_bodies or [])),
error_code=AuditResponseCode.DATA_ERROR))
@@ -506,7 +525,7 @@ class PolicyRest(object):
.format(audit.request_id, type(ex).__name__, str(ex),
"_get_latest_policies", json.dumps(policy_filter)))
- PolicyRest._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None, None
@@ -545,7 +564,7 @@ class PolicyRest(object):
else:
return result
- PolicyRest._logger.debug("%s", str_policy_filters)
+ _LOGGER.debug("%s", str_policy_filters)
metrics_total = Metrics(aud_parent=audit, targetEntity=target_entity,
targetServiceName=PolicyRest._url_get_config)
@@ -571,8 +590,8 @@ class PolicyRest(object):
result[ERRORED_POLICIES] = dict(
pair for (_, eps) in latest_policies if eps for pair in eps.items())
- PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
- str_policy_filters, json.dumps(result))
+ _LOGGER.debug("got policies for policy_filters: %s. result: %s",
+ str_policy_filters, json.dumps(result))
return result
except Exception as ex:
@@ -580,7 +599,7 @@ class PolicyRest(object):
.format(audit.request_id, type(ex).__name__, str(ex),
"get_latest_policies", str_metrics))
- PolicyRest._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None
diff --git a/policyhandler/pdp_api_v0/policy_updates.py b/policyhandler/pdp_api_v0/policy_updates.py
new file mode 100644
index 0000000..eafdca2
--- /dev/null
+++ b/policyhandler/pdp_api_v0/policy_updates.py
@@ -0,0 +1,107 @@
+# ================================================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""policy-updates accumulates the policy-update notifications from PDP"""
+
+import json
+import os
+
+from ..onap.audit import Audit
+from ..policy_consts import POLICY_ID, POLICY_NAMES
+from ..utils import Utils
+from .pdp_consts import POLICY_NAME
+from .policy_utils import PolicyUtils
+
+
+_LOGGER = Utils.get_logger(__file__)
+
+class PolicyUpdates(object):
+ """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)"""
+ PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__)))
+
+ def __init__(self):
+ """init and reset"""
+ self._audit = None
+ self._policies_updated = {}
+ self._policies_removed = {}
+
+ def reset(self):
+ """resets the state"""
+ self.__init__()
+
+ def pop_policy_updates(self):
+ """
+ Returns the consolidated (audit, policies_updated, policies_removed)
+ and resets the state
+ """
+ if not self._audit:
+ return None, None, None
+
+ audit = self._audit
+ policies_updated = self._policies_updated
+ policies_removed = self._policies_removed
+
+ self.reset()
+
+ return audit, policies_updated, policies_removed
+
+
+ def push_policy_updates(self, policies_updated, policies_removed):
+ """consolidate the new policies_updated, policies_removed to existing ones"""
+ for policy_body in policies_updated:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+
+ self._policies_updated[policy_id] = policy
+
+ rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES)
+ if rm_policy_names and policy_name in rm_policy_names:
+ del rm_policy_names[policy_name]
+
+ for policy_body in policies_removed:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+
+ if policy_id in self._policies_removed:
+ policy = self._policies_removed[policy_id]
+
+ if POLICY_NAMES not in policy:
+ policy[POLICY_NAMES] = {}
+ policy[POLICY_NAMES][policy_name] = True
+ self._policies_removed[policy_id] = policy
+
+ req_message = ("policy-update notification - updated[{0}], removed[{1}]"
+ .format(len(self._policies_updated),
+ len(self._policies_removed)))
+
+ if not self._audit:
+ self._audit = Audit(job_name="policy_update",
+ req_message=req_message,
+ retry_get_config=True)
+ else:
+ self._audit.req_message = req_message
+
+ _LOGGER.info(
+ "pending(%s) for %s policies_updated %s policies_removed %s",
+ self._audit.request_id, req_message,
+ json.dumps(self._policies_updated), json.dumps(self._policies_removed))
diff --git a/policyhandler/pdp_api_v0/policy_utils.py b/policyhandler/pdp_api_v0/policy_utils.py
new file mode 100644
index 0000000..d337665
--- /dev/null
+++ b/policyhandler/pdp_api_v0/policy_utils.py
@@ -0,0 +1,120 @@
+# ================================================================================
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""utils for policy usage and conversions"""
+
+import re
+
+from ..policy_consts import POLICY_BODY, POLICY_ID
+from ..utils import Utils
+from .pdp_consts import POLICY_CONFIG, POLICY_NAME, POLICY_VERSION
+
+
+class PolicyUtils(object):
+ """policy-client utils"""
+ _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
+
+ @staticmethod
+ def extract_policy_id(policy_name):
+ """ policy_name = policy_id + "." + <version> + "." + <extension>
+ For instance,
+ policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
+ policy_id = DCAE_alex.Config_alex_policy_number_1
+ policy_scope = DCAE_alex
+ policy_class = Config
+ policy_version = 3
+ type = extension = xml
+ delimiter = "."
+ policy_class_delimiter = "_"
+ policy_name in PAP = DCAE_alex.alex_policy_number_1
+ """
+ if not policy_name:
+ return
+ return PolicyUtils._policy_name_ext.sub('', policy_name)
+
+ @staticmethod
+ def parse_policy_config(policy):
+ """try parsing the config in policy."""
+ if not policy:
+ return policy
+ config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
+ if config:
+ policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config)
+ return policy
+
+ @staticmethod
+ def convert_to_policy(policy_body):
+ """wrap policy_body received from policy-engine with policy_id."""
+ if not policy_body:
+ return None
+ policy_name = policy_body.get(POLICY_NAME)
+ policy_version = policy_body.get(POLICY_VERSION)
+ if not policy_name or not policy_version:
+ return None
+ policy_id = PolicyUtils.extract_policy_id(policy_name)
+ if not policy_id:
+ return None
+ return {POLICY_ID:policy_id, POLICY_BODY:policy_body}
+
+ @staticmethod
+ def select_latest_policy(policy_bodies, expected_versions=None, ignore_policy_names=None):
+ """For some reason, the policy-engine returns all version of the policy_bodies.
+ DCAE-Controller is only interested in the latest version
+ """
+ if not policy_bodies:
+ return
+ latest_policy_body = {}
+ for policy_body in policy_bodies:
+ policy_name = policy_body.get(POLICY_NAME)
+ policy_version = policy_body.get(POLICY_VERSION)
+ if not policy_name or not policy_version or not policy_version.isdigit():
+ continue
+ if expected_versions and policy_version not in expected_versions:
+ continue
+ if ignore_policy_names and policy_name in ignore_policy_names:
+ continue
+
+ if (not latest_policy_body
+ or int(latest_policy_body[POLICY_VERSION]) < int(policy_version)):
+ latest_policy_body = policy_body
+
+ return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body))
+
+ @staticmethod
+ def select_latest_policies(policy_bodies):
+ """For some reason, the policy-engine returns all version of the policy_bodies.
+ DCAE-Controller is only interested in the latest versions
+ """
+ if not policy_bodies:
+ return {}
+ policies = {}
+ for policy_body in policy_bodies:
+ policy = PolicyUtils.convert_to_policy(policy_body)
+ if not policy:
+ continue
+ policy_id = policy.get(POLICY_ID)
+ policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
+ if not policy_id or not policy_version or not policy_version.isdigit():
+ continue
+ if (policy_id not in policies
+ or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])):
+ policies[policy_id] = policy
+
+ for policy_id in policies:
+ policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
+
+ return policies
diff --git a/policyhandler/pdp_client.py b/policyhandler/pdp_client.py
new file mode 100644
index 0000000..353b604
--- /dev/null
+++ b/policyhandler/pdp_client.py
@@ -0,0 +1,29 @@
+# ================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+"""policy-client communicates with policy-engine thru REST API"""
+
+from .config import Config
+from .utils import Utils
+
+if Config.is_pdp_api_default():
+ from .pdp_api import *
+else:
+ from .pdp_api_v0 import *
+
+_LOGGER = Utils.get_logger(__file__)
+_LOGGER.info(get_pdp_api_info())
diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py
index 8f3ec76..2eafc53 100644
--- a/policyhandler/policy_consts.py
+++ b/policyhandler/policy_consts.py
@@ -14,15 +14,11 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""contants of policy-handler"""
POLICY_ID = 'policy_id'
-POLICY_VERSION = "policyVersion"
-POLICY_NAME = "policyName"
POLICY_BODY = 'policy_body'
-POLICY_CONFIG = 'config'
CATCH_UP = "catch_up"
AUTO_CATCH_UP = "auto catch_up"
@@ -34,7 +30,6 @@ POLICY_FILTER = "policy_filter"
POLICY_FILTERS = "policy_filters"
POLICIES = "policies"
POLICY_VERSIONS = "policy_versions"
-MATCHING_CONDITIONS = "matchingConditions"
POLICY_NAMES = "policy_names"
POLICY_FILTER_MATCHES = "policy_filter_matches"
TARGET_ENTITY = "target_entity"
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 7cf1869..d949c4b 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""
policy-receiver communicates with policy-engine
@@ -25,323 +24,37 @@ on receiving the policy-notifications, the policy-receiver
passes the notifications to policy-updater
"""
-import copy
-import json
-import logging
-import ssl
-import time
-import urllib.parse
-from datetime import datetime
-from threading import Lock, Thread
-
-import websocket
-
-from .config import Config, Settings
-from .onap.audit import Audit
-from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
-from .policy_updater import PolicyUpdater
-from .policy_utils import Utils
from .service_activator import ServiceActivator
-LOADED_POLICIES = 'loadedPolicies'
-REMOVED_POLICIES = 'removedPolicies'
-POLICY_VER = 'versionNo'
-POLICY_MATCHES = 'matches'
-
-class _PolicyReceiver(Thread):
- """web-socket to PolicyEngine"""
- _logger = logging.getLogger("policy_handler.policy_receiver")
- WS_STARTED = "started"
- WS_START_COUNT = "start_count"
- WS_CLOSE_COUNT = "close_count"
- WS_ERROR_COUNT = "error_count"
- WS_PONG_COUNT = "pong_count"
- WS_MESSAGE_COUNT = "message_count"
- WS_MESSAGE_TIMESTAMP = "message_timestamp"
- WS_STATUS = "status"
- WS_PING_INTERVAL_DEFAULT = 30
- WEB_SOCKET_HEALTH = "web_socket_health"
-
- def __init__(self, audit, policy_updater):
- """web-socket inside the thread to receive policy notifications from PolicyEngine"""
- Thread.__init__(self, name="policy_receiver", daemon=True)
-
- self._policy_updater = policy_updater
- self._lock = Lock()
- self._keep_running = True
- self._settings = Settings(Config.FIELD_POLICY_ENGINE)
-
- self._sleep_before_restarting = 5
- self._web_socket_url = None
- self._web_socket_sslopt = None
- self._tls_wss_ca_mode = None
- self._web_socket = None
- self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT
- self._web_socket_health = {
- _PolicyReceiver.WS_START_COUNT: 0,
- _PolicyReceiver.WS_CLOSE_COUNT: 0,
- _PolicyReceiver.WS_ERROR_COUNT: 0,
- _PolicyReceiver.WS_PONG_COUNT: 0,
- _PolicyReceiver.WS_MESSAGE_COUNT: 0,
- _PolicyReceiver.WS_STATUS: "created"
- }
-
- Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH, self._get_health)
- self.reconfigure(audit)
-
- def reconfigure(self, audit):
- """configure and reconfigure the web-socket"""
- with self._lock:
- _PolicyReceiver._logger.info(audit.info("web_socket_health {}".format(
- json.dumps(self._get_health(), sort_keys=True))))
- self._sleep_before_restarting = 5
- self._settings.set_config(Config.discovered_config)
- changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
-
- if not changed:
- self._settings.commit_change()
- return False
-
- prev_web_socket_url = self._web_socket_url
- prev_web_socket_sslopt = self._web_socket_sslopt
- prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
-
- self._web_socket_sslopt = None
-
- resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/",
- config.get("path_notifications", "/pdp/notifications"))
-
- self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE)
-
- self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS)
- if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60:
- self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT
-
- if resturl.startswith("https:"):
- self._web_socket_url = resturl.replace("https:", "wss:")
-
- verify = Config.get_tls_verify(self._tls_wss_ca_mode)
- if verify is False:
- self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE}
- elif verify is True:
- pass
- else:
- self._web_socket_sslopt = {'ca_certs': verify}
-
- else:
- self._web_socket_url = resturl.replace("http:", "ws:")
-
- log_changed = (
- "changed web_socket_url(%s) or tls_wss_ca_mode(%s)"
- " or ws_ping_interval_in_secs(%s): %s" %
- (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs,
- self._settings))
- if (self._web_socket_url == prev_web_socket_url
- and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)
- and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs):
- _PolicyReceiver._logger.info(audit.info("not {}".format(log_changed)))
- self._settings.commit_change()
- return False
-
- _PolicyReceiver._logger.info(audit.info(log_changed))
- self._settings.commit_change()
-
- self._stop_notifications()
- return True
-
- def run(self):
- """listen on web-socket and pass the policy notifications to policy-updater"""
- _PolicyReceiver._logger.info("starting policy_receiver...")
- websocket.enableTrace(True)
- restarting = False
- while True:
- if not self._get_keep_running():
- break
-
- self._stop_notifications()
-
- if restarting:
- with self._lock:
- sleep_before_restarting = self._sleep_before_restarting
- _PolicyReceiver._logger.info(
- "going to sleep for %s secs before restarting policy-notifications",
- sleep_before_restarting)
-
- time.sleep(sleep_before_restarting)
- if not self._get_keep_running():
- break
-
- with self._lock:
- web_socket_url = self._web_socket_url
- sslopt = copy.deepcopy(self._web_socket_sslopt)
- tls_wss_ca_mode = self._tls_wss_ca_mode
- ws_ping_interval_in_secs = self._ws_ping_interval_in_secs
-
- _PolicyReceiver._logger.info(
- "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)"
- " ws_ping_interval_in_secs(%s)",
- web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs)
-
- self._web_socket = websocket.WebSocketApp(
- web_socket_url,
- on_open=self._on_ws_open,
- on_message=self._on_pdp_message,
- on_close=self._on_ws_close,
- on_error=self._on_ws_error,
- on_pong=self._on_ws_pong
- )
-
- _PolicyReceiver._logger.info("waiting for policy-notifications...")
- self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs)
- restarting = True
-
- Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH)
- _PolicyReceiver._logger.info("exit policy-receiver")
-
- def _get_keep_running(self):
- """thread-safe check whether to continue running"""
- with self._lock:
- keep_running = self._keep_running
- return keep_running
-
- def _stop_notifications(self):
- """close the web-socket == stops the notification service if running."""
- with self._lock:
- if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
- self._web_socket.close()
- _PolicyReceiver._logger.info("stopped receiving notifications from PDP")
-
- def _on_pdp_message(self, *args):
- """received the notification from PDP"""
- self._web_socket_health[_PolicyReceiver.WS_MESSAGE_COUNT] += 1
- self._web_socket_health[_PolicyReceiver.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow())
- try:
- message = args and args[-1]
- _PolicyReceiver._logger.info("Received notification message: %s", message)
- _PolicyReceiver._logger.info("web_socket_health %s",
- json.dumps(self._get_health(), sort_keys=True))
- if not message:
- return
- message = json.loads(message)
-
- if not message or not isinstance(message, dict):
- _PolicyReceiver._logger.warning("unexpected message from PDP: %s",
- json.dumps(message))
- return
-
- policies_updated = [
- {POLICY_NAME: policy.get(POLICY_NAME),
- POLICY_VERSION: policy.get(POLICY_VER),
- MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})}
- for policy in message.get(LOADED_POLICIES, [])
- ]
-
- policies_removed = [
- {POLICY_NAME: removed_policy.get(POLICY_NAME),
- POLICY_VERSION: removed_policy.get(POLICY_VER)}
- for removed_policy in message.get(REMOVED_POLICIES, [])
- ]
-
- if not policies_updated and not policies_removed:
- _PolicyReceiver._logger.info("no policy updated or removed")
- return
-
- self._policy_updater.policy_update(policies_updated, policies_removed)
- except Exception as ex:
- error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
- "on_pdp_message", json.dumps(message))
-
- _PolicyReceiver._logger.exception(error_msg)
-
- def _on_ws_error(self, error):
- """report an error"""
- _PolicyReceiver._logger.exception("policy-notification error %s", str(error))
- self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5
-
- self._web_socket_health[_PolicyReceiver.WS_STATUS] = "error"
- self._web_socket_health[_PolicyReceiver.WS_ERROR_COUNT] += 1
- self._web_socket_health["last_error"] = {
- "error": str(error), "timestamp": str(datetime.utcnow())
- }
- _PolicyReceiver._logger.info("web_socket_health %s",
- json.dumps(self._get_health(), sort_keys=True))
-
- def _on_ws_close(self, code, reason):
- """restart web-socket on close"""
- self._web_socket_health["last_closed"] = str(datetime.utcnow())
- self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed"
- self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1
- _PolicyReceiver._logger.info(
- "lost connection(%s, %s) to PDP web_socket_health %s",
- code, reason, json.dumps(self._get_health(), sort_keys=True))
-
- def _on_ws_open(self):
- """started web-socket"""
- self._web_socket_health[_PolicyReceiver.WS_STATUS] = _PolicyReceiver.WS_STARTED
- self._web_socket_health[_PolicyReceiver.WS_START_COUNT] += 1
- self._web_socket_health[_PolicyReceiver.WS_STARTED] = datetime.utcnow()
- _PolicyReceiver._logger.info("opened connection to PDP web_socket_health %s",
- json.dumps(self._get_health(), sort_keys=True))
-
- def _on_ws_pong(self, pong):
- """pong = response to pinging the server of the web-socket"""
- self._web_socket_health[_PolicyReceiver.WS_PONG_COUNT] += 1
- _PolicyReceiver._logger.info(
- "pong(%s) from connection to PDP web_socket_health %s",
- pong, json.dumps(self._get_health(), sort_keys=True))
-
- def _get_health(self):
- """returns the healthcheck of the web-socket as json"""
- web_socket_health = copy.deepcopy(self._web_socket_health)
- web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs
- started = web_socket_health.get(_PolicyReceiver.WS_STARTED)
- if started:
- web_socket_health[_PolicyReceiver.WS_STARTED] = str(started)
- web_socket_health["uptime"] = str(datetime.utcnow() - started)
- return web_socket_health
-
-
- def shutdown(self, audit):
- """Shutdown the policy-receiver"""
- _PolicyReceiver._logger.info(audit.info("shutdown policy-receiver"))
- with self._lock:
- self._keep_running = False
-
- self._stop_notifications()
-
- if self.is_alive():
- self.join()
-
-
class PolicyReceiver(object):
"""
policy-receiver - static singleton wrapper around two threads
policy_updater - master thread for all scheduled actions
- policy_receiver - listens to policy-engine through web-socket
+ policy_listener - listens to policy-engine through web-socket
"""
_policy_updater = None
- _policy_receiver = None
+ _policy_listener = None
@staticmethod
def is_running():
"""check whether the policy-receiver runs"""
- return (PolicyReceiver._policy_receiver
- and PolicyReceiver._policy_receiver.is_alive()
+ return (PolicyReceiver._policy_listener
+ and PolicyReceiver._policy_listener.is_alive()
and PolicyReceiver._policy_updater
and PolicyReceiver._policy_updater.is_alive())
@staticmethod
- def _close_receiver(audit):
+ def _close_listener(audit):
"""stop the notification-handler"""
- if PolicyReceiver._policy_receiver:
- policy_receiver = PolicyReceiver._policy_receiver
- PolicyReceiver._policy_receiver = None
+ if PolicyReceiver._policy_listener:
+ policy_receiver = PolicyReceiver._policy_listener
+ PolicyReceiver._policy_listener = None
policy_receiver.shutdown(audit)
@staticmethod
def shutdown(audit):
"""shutdown the notification-handler and policy-updater"""
- PolicyReceiver._close_receiver(audit)
+ PolicyReceiver._close_listener(audit)
PolicyReceiver._policy_updater.shutdown(audit)
@staticmethod
@@ -359,23 +72,26 @@ class PolicyReceiver(object):
"""act on reconfiguration event"""
active = ServiceActivator.is_active_mode_of_operation(audit)
- if not PolicyReceiver._policy_receiver:
+ if not PolicyReceiver._policy_listener:
if active:
- PolicyReceiver._policy_receiver = _PolicyReceiver(audit,
- PolicyReceiver._policy_updater)
- PolicyReceiver._policy_receiver.start()
+ from . import pdp_client
+ PolicyReceiver._policy_listener = pdp_client.PolicyListener(
+ audit, PolicyReceiver._policy_updater
+ )
+ PolicyReceiver._policy_listener.start()
return
if not active:
- PolicyReceiver._close_receiver(audit)
+ PolicyReceiver._close_listener(audit)
return
- PolicyReceiver._policy_receiver.reconfigure(audit)
+ PolicyReceiver._policy_listener.reconfigure(audit)
@staticmethod
def run(audit):
"""run policy_updater and policy_receiver"""
+ from .policy_updater import PolicyUpdater
PolicyReceiver._policy_updater = PolicyUpdater(PolicyReceiver._on_reconfigure)
PolicyReceiver._on_reconfigure(audit)
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index af1ea4b..3fcde40 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -14,109 +14,26 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""policy-updater thread"""
import json
-import logging
from threading import Event, Lock, Thread
+from . import pdp_client
from .config import Config, Settings
from .deploy_handler import DeployHandler, PolicyUpdateMessage
from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
-from .policy_consts import (AUTO_CATCH_UP, AUTO_RECONFIGURE, CATCH_UP,
- POLICY_BODY, POLICY_ID, POLICY_NAME, POLICY_NAMES,
- POLICY_VERSION)
-from .policy_matcher import PolicyMatcher
-from .policy_rest import PolicyRest
-from .policy_utils import PolicyUtils
+from .policy_consts import AUTO_CATCH_UP, AUTO_RECONFIGURE, CATCH_UP
from .service_activator import ServiceActivator
from .step_timer import StepTimer
+from .utils import Utils
-class _PolicyUpdate(object):
- """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)"""
- _logger = logging.getLogger("policy_handler.policy_update")
-
- def __init__(self):
- """init and reset"""
- self._audit = None
- self._policies_updated = {}
- self._policies_removed = {}
-
- def reset(self):
- """resets the state"""
- self.__init__()
-
- def pop_policy_update(self):
- """
- Returns the consolidated (audit, policies_updated, policies_removed)
- and resets the state
- """
- if not self._audit:
- return None, None, None
-
- audit = self._audit
- policies_updated = self._policies_updated
- policies_removed = self._policies_removed
-
- self.reset()
-
- return audit, policies_updated, policies_removed
-
-
- def push_policy_update(self, policies_updated, policies_removed):
- """consolidate the new policies_updated, policies_removed to existing ones"""
- for policy_body in policies_updated:
- policy_name = policy_body.get(POLICY_NAME)
- policy = PolicyUtils.convert_to_policy(policy_body)
- if not policy:
- continue
- policy_id = policy.get(POLICY_ID)
-
- self._policies_updated[policy_id] = policy
-
- rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES)
- if rm_policy_names and policy_name in rm_policy_names:
- del rm_policy_names[policy_name]
-
- for policy_body in policies_removed:
- policy_name = policy_body.get(POLICY_NAME)
- policy = PolicyUtils.convert_to_policy(policy_body)
- if not policy:
- continue
- policy_id = policy.get(POLICY_ID)
-
- if policy_id in self._policies_removed:
- policy = self._policies_removed[policy_id]
-
- if POLICY_NAMES not in policy:
- policy[POLICY_NAMES] = {}
- policy[POLICY_NAMES][policy_name] = True
- self._policies_removed[policy_id] = policy
-
- req_message = ("policy-update notification - updated[{0}], removed[{1}]"
- .format(len(self._policies_updated),
- len(self._policies_removed)))
-
- if not self._audit:
- self._audit = Audit(job_name="policy_update",
- req_message=req_message,
- retry_get_config=True)
- else:
- self._audit.req_message = req_message
-
- self._logger.info(
- "pending(%s) for %s policies_updated %s policies_removed %s",
- self._audit.request_id, req_message,
- json.dumps(self._policies_updated), json.dumps(self._policies_removed))
-
+_LOGGER = Utils.get_logger(__file__)
class PolicyUpdater(Thread):
"""sequentially handle the policy-updates and catch-ups in its own policy_updater thread"""
- _logger = logging.getLogger("policy_handler.policy_updater")
-
def __init__(self, on_reconfigure_receiver):
"""init static config of PolicyUpdater."""
Thread.__init__(self, name="policy_updater", daemon=True)
@@ -132,7 +49,7 @@ class PolicyUpdater(Thread):
self._aud_shutdown = None
self._aud_catch_up = None
self._aud_reconfigure = None
- self._policy_update = _PolicyUpdate()
+ self._policy_updates = pdp_client.PolicyUpdates()
self._catch_up_interval = None
self._reconfigure_interval = None
@@ -151,7 +68,7 @@ class PolicyUpdater(Thread):
_, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {})
self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60
- PolicyUpdater._logger.info(
+ _LOGGER.info(
"intervals: catch_up(%s) reconfigure(%s): %s",
self._catch_up_interval, self._reconfigure_interval, self._settings)
self._settings.commit_change()
@@ -160,7 +77,7 @@ class PolicyUpdater(Thread):
def policy_update(self, policies_updated, policies_removed):
"""enqueue the policy-updates"""
with self._lock:
- self._policy_update.push_policy_update(policies_updated, policies_removed)
+ self._policy_updates.push_policy_updates(policies_updated, policies_removed)
self._run.set()
def catch_up(self, audit=None):
@@ -168,7 +85,7 @@ class PolicyUpdater(Thread):
with self._lock:
if not self._aud_catch_up:
self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
- PolicyUpdater._logger.info(
+ _LOGGER.info(
"catch_up %s request_id %s",
self._aud_catch_up.req_message, self._aud_catch_up.request_id
)
@@ -179,7 +96,7 @@ class PolicyUpdater(Thread):
with self._lock:
if not self._aud_reconfigure:
self._aud_reconfigure = audit or Audit(req_message=AUTO_RECONFIGURE)
- PolicyUpdater._logger.info(
+ _LOGGER.info(
"%s request_id %s",
self._aud_reconfigure.req_message, self._aud_reconfigure.request_id
)
@@ -187,10 +104,10 @@ class PolicyUpdater(Thread):
def run(self):
"""wait and run the policy-update in thread"""
- PolicyUpdater._logger.info("starting policy_updater...")
+ _LOGGER.info("starting policy_updater...")
self._run_reconfigure_timer()
while True:
- PolicyUpdater._logger.info("waiting for policy-updates...")
+ _LOGGER.info("waiting for policy-updates...")
self._run.wait()
with self._lock:
@@ -209,7 +126,7 @@ class PolicyUpdater(Thread):
self._on_policy_update()
- PolicyUpdater._logger.info("exit policy-updater")
+ _LOGGER.info("exit policy-updater")
def _keep_running(self):
"""thread-safe check whether to continue running"""
@@ -226,18 +143,15 @@ class PolicyUpdater(Thread):
return
if self._catch_up_timer:
- self._logger.info("next step catch_up_timer in %s", self._catch_up_interval)
+ _LOGGER.info("next step catch_up_timer in %s", self._catch_up_interval)
self._catch_up_timer.next(self._catch_up_interval)
return
self._catch_up_timer = StepTimer(
- "catch_up_timer",
- self._catch_up_interval,
- PolicyUpdater.catch_up,
- PolicyUpdater._logger,
- self
+ "catch_up_timer", self._catch_up_interval,
+ PolicyUpdater.catch_up, self
)
- self._logger.info("started catch_up_timer in %s", self._catch_up_interval)
+ _LOGGER.info("started catch_up_timer in %s", self._catch_up_interval)
self._catch_up_timer.start()
def _run_reconfigure_timer(self):
@@ -246,41 +160,38 @@ class PolicyUpdater(Thread):
return
if self._reconfigure_timer:
- self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval)
+ _LOGGER.info("next step reconfigure_timer in %s", self._reconfigure_interval)
self._reconfigure_timer.next(self._reconfigure_interval)
return
self._reconfigure_timer = StepTimer(
- "reconfigure_timer",
- self._reconfigure_interval,
- PolicyUpdater.reconfigure,
- PolicyUpdater._logger,
- self
+ "reconfigure_timer", self._reconfigure_interval,
+ PolicyUpdater.reconfigure, self
)
- self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval)
+ _LOGGER.info("started reconfigure_timer in %s", self._reconfigure_interval)
self._reconfigure_timer.start()
def _pause_catch_up_timer(self):
"""pause catch_up_timer"""
if self._catch_up_timer:
- self._logger.info("pause catch_up_timer")
+ _LOGGER.info("pause catch_up_timer")
self._catch_up_timer.pause()
def _stop_timers(self):
"""stop and destroy the catch_up and reconfigure timers"""
if self._catch_up_timer:
- self._logger.info("stopping catch_up_timer")
+ _LOGGER.info("stopping catch_up_timer")
self._catch_up_timer.stop()
self._catch_up_timer.join()
self._catch_up_timer = None
- self._logger.info("stopped catch_up_timer")
+ _LOGGER.info("stopped catch_up_timer")
if self._reconfigure_timer:
- self._logger.info("stopping reconfigure_timer")
+ _LOGGER.info("stopping reconfigure_timer")
self._reconfigure_timer.stop()
self._reconfigure_timer.join()
self._reconfigure_timer = None
- self._logger.info("stopped reconfigure_timer")
+ _LOGGER.info("stopped reconfigure_timer")
def _on_reconfigure(self):
"""bring the latest config and reconfigure"""
@@ -296,7 +207,7 @@ class PolicyUpdater(Thread):
reconfigure_result = ""
try:
need_to_catch_up = False
- PolicyUpdater._logger.info(log_line)
+ _LOGGER.info(log_line)
active_prev = ServiceActivator.is_active_mode_of_operation(aud_reconfigure)
Config.discover(aud_reconfigure)
@@ -314,7 +225,7 @@ class PolicyUpdater(Thread):
if self._set_timer_intervals():
changed_configs.append("timer_intervals")
- if PolicyRest.reconfigure():
+ if pdp_client.PolicyRest.reconfigure():
need_to_catch_up = True
changed_configs.append(Config.FIELD_POLICY_ENGINE)
@@ -335,7 +246,7 @@ class PolicyUpdater(Thread):
Config.discovered_config.commit_change()
aud_reconfigure.audit_done(result=reconfigure_result)
- PolicyUpdater._logger.info(log_line + reconfigure_result)
+ _LOGGER.info(log_line + reconfigure_result)
if need_to_catch_up:
self._pause_catch_up_timer()
@@ -345,16 +256,15 @@ class PolicyUpdater(Thread):
error_msg = "crash {} {}{}: {}: {}".format(
"_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex))
- PolicyUpdater._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
aud_reconfigure.audit_done(result=error_msg)
self._run_reconfigure_timer()
- PolicyUpdater._logger.info("policy_handler health: %s",
- json.dumps(aud_reconfigure.health(full=True)))
- PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info()))
+ _LOGGER.info("policy_handler health: %s", json.dumps(aud_reconfigure.health(full=True)))
+ _LOGGER.info("process_info: %s", json.dumps(aud_reconfigure.process_info()))
def _on_catch_up(self):
@@ -363,7 +273,7 @@ class PolicyUpdater(Thread):
aud_catch_up = self._aud_catch_up
if self._aud_catch_up:
self._aud_catch_up = None
- self._policy_update.reset()
+ self._policy_updates.reset()
if not aud_catch_up:
return False
@@ -374,12 +284,11 @@ class PolicyUpdater(Thread):
)
self._pause_catch_up_timer()
aud_catch_up.audit_done(result=catch_up_result)
- PolicyUpdater._logger.info(catch_up_result)
+ _LOGGER.info(catch_up_result)
self._run_catch_up_timer()
- PolicyUpdater._logger.info("policy_handler health: %s",
- json.dumps(aud_catch_up.health(full=True)))
- PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info()))
+ _LOGGER.info("policy_handler health: %s", json.dumps(aud_catch_up.health(full=True)))
+ _LOGGER.info("process_info: %s", json.dumps(aud_catch_up.process_info()))
return False
log_line = "catch_up {0} request_id {1}".format(
@@ -388,25 +297,26 @@ class PolicyUpdater(Thread):
catch_up_result = ""
try:
not_found_ok = None
- PolicyUpdater._logger.info(log_line)
+ _LOGGER.info(log_line)
self._pause_catch_up_timer()
- _, policies, policy_filters = PolicyMatcher.get_deployed_policies(aud_catch_up)
+ (_, policies,
+ policy_filters) = pdp_client.PolicyMatcher.get_deployed_policies(aud_catch_up)
catch_up_message = None
if aud_catch_up.is_not_found():
not_found_ok = True
else:
- _, catch_up_message = PolicyMatcher.build_catch_up_message(
+ _, catch_up_message = pdp_client.PolicyMatcher.build_catch_up_message(
aud_catch_up, policies, policy_filters)
if not_found_ok:
catch_up_result = ("- not sending catch-up "
"- no deployed policies or policy-filters")
- PolicyUpdater._logger.warning(catch_up_result)
+ _LOGGER.warning(catch_up_result)
elif not (catch_up_message and aud_catch_up.is_success()):
catch_up_result = "- not sending catch-up to deployment-handler due to errors"
- PolicyUpdater._logger.warning(catch_up_result)
+ _LOGGER.warning(catch_up_result)
elif catch_up_message.empty():
catch_up_result = "- not sending empty catch-up to deployment-handler"
else:
@@ -414,18 +324,18 @@ class PolicyUpdater(Thread):
DeployHandler.policy_update(aud_catch_up, catch_up_message)
if not aud_catch_up.is_success():
catch_up_result = "- failed to send catch-up to deployment-handler"
- PolicyUpdater._logger.warning(catch_up_result)
+ _LOGGER.warning(catch_up_result)
else:
catch_up_result = "- sent catch-up to deployment-handler"
success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
- PolicyUpdater._logger.info(log_line + " " + catch_up_result)
+ _LOGGER.info(log_line + " " + catch_up_result)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
.format(aud_catch_up.request_id, type(ex).__name__, str(ex),
"on_catch_up", log_line + " " + catch_up_result))
- PolicyUpdater._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
aud_catch_up.audit_done(result=error_msg)
@@ -433,9 +343,8 @@ class PolicyUpdater(Thread):
self._run_catch_up_timer()
- PolicyUpdater._logger.info("policy_handler health: %s",
- json.dumps(aud_catch_up.health(full=True)))
- PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info()))
+ _LOGGER.info("policy_handler health: %s", json.dumps(aud_catch_up.health(full=True)))
+ _LOGGER.info("process_info: %s", json.dumps(aud_catch_up.process_info()))
return success
@@ -443,42 +352,39 @@ class PolicyUpdater(Thread):
"""handle the event of policy-updates"""
result = ""
with self._lock:
- audit, policies_updated, policies_removed = self._policy_update.pop_policy_update()
+ audit, policies_updated, policies_removed = self._policy_updates.pop_policy_updates()
if not audit:
return
log_line = "request_id: {} policies_updated: {} policies_removed: {}".format(
audit.request_id, json.dumps(policies_updated), json.dumps(policies_removed))
- PolicyUpdater._logger.info(log_line)
+ _LOGGER.info(log_line)
try:
not_found_ok = None
(updated_policies, removed_policies,
- policy_filter_matches) = PolicyMatcher.match_to_deployed_policies(
+ policy_filter_matches) = pdp_client.PolicyMatcher.match_to_deployed_policies(
audit, policies_updated, policies_removed)
if audit.is_not_found():
not_found_ok = True
elif updated_policies or removed_policies:
- updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
- (audit,
- [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION))
- for policy_id, policy in updated_policies.items()],
- [(policy_id, policy.get(POLICY_NAMES, {}))
- for policy_id, policy in removed_policies.items()]
- ))
+ (updated_policies,
+ removed_policies) = pdp_client.PolicyRest.get_latest_updated_policies(
+ audit, updated_policies, removed_policies
+ )
if not_found_ok:
result = ("- not sending policy-updates to deployment-handler "
"- no deployed policies or policy-filters")
- PolicyUpdater._logger.warning(result)
+ _LOGGER.warning(result)
elif not audit.is_success():
result = "- not sending policy-updates to deployment-handler due to errors"
- PolicyUpdater._logger.warning(result)
+ _LOGGER.warning(result)
elif not updated_policies and not removed_policies:
result = "- not sending empty policy-updates to deployment-handler"
- PolicyUpdater._logger.info(result)
+ _LOGGER.info(result)
else:
message = PolicyUpdateMessage(updated_policies, removed_policies,
policy_filter_matches, False)
@@ -493,19 +399,19 @@ class PolicyUpdater(Thread):
log_line = "request_id[{}]: {}".format(audit.request_id, str(message))
if not audit.is_success():
result = "- failed to send to deployment-handler {}".format(log_updates)
- PolicyUpdater._logger.warning(result)
+ _LOGGER.warning(result)
else:
result = "- sent to deployment-handler {}".format(log_updates)
audit.audit_done(result=result)
- PolicyUpdater._logger.info(log_line + " " + result)
+ _LOGGER.info(log_line + " " + result)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
.format(audit.request_id, type(ex).__name__, str(ex),
"on_policies_update", log_line + " " + result))
- PolicyUpdater._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
@@ -517,7 +423,7 @@ class PolicyUpdater(Thread):
def shutdown(self, audit):
"""Shutdown the policy-updater"""
- PolicyUpdater._logger.info("shutdown policy-updater")
+ _LOGGER.info("shutdown policy-updater")
with self._lock:
self._aud_shutdown = audit
self._run.set()
diff --git a/policyhandler/service_activator.py b/policyhandler/service_activator.py
index d51d11c..9c8a1b2 100644
--- a/policyhandler/service_activator.py
+++ b/policyhandler/service_activator.py
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""
ask service_activator for the mode_of_operation
@@ -30,23 +29,25 @@
"""
import json
-import logging
from copy import deepcopy
from urllib.parse import urljoin
import requests
from .config import Config, Settings
-from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics
+from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode,
+ Metrics)
from .policy_consts import TARGET_ENTITY
+from .utils import Utils
+_LOGGER = Utils.get_logger(__file__)
class ServiceActivator(object):
"""calling the service_activator web api to determine the mode_of_operation"""
- _logger = logging.getLogger("policy_handler.service_activator")
DEFAULT_TARGET_ENTITY = "service_activator"
DEFAULT_TIMEOUT_IN_SECS = 10
MODE_OF_OPERATION_ACTIVE = "active"
+ SERVICE_MODE = "service_mode"
_lazy_inited = False
_settings = Settings(Config.MODE_OF_OPERATION, Config.SERVICE_ACTIVATOR)
@@ -81,6 +82,7 @@ class ServiceActivator(object):
"""
ServiceActivator._custom_kwargs = {}
ServiceActivator._url = ServiceActivator._url_register = ""
+ Audit.register_item_health(ServiceActivator.SERVICE_MODE, ServiceActivator._get_service_mode)
try:
_, ServiceActivator._mode_of_operation = ServiceActivator._settings.get_by_key(
@@ -98,7 +100,7 @@ class ServiceActivator(object):
tls_ca_mode = config_sa.get(Config.TLS_CA_MODE)
ServiceActivator._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
- ServiceActivator._logger.info(audit.info(
+ _LOGGER.info(audit.info(
"dns based routing to %s: url(%s) tls_ca_mode(%s) custom_kwargs(%s)",
ServiceActivator._target_entity, ServiceActivator._url_register,
tls_ca_mode, json.dumps(ServiceActivator._custom_kwargs)))
@@ -134,17 +136,31 @@ class ServiceActivator(object):
ServiceActivator._init(audit)
@staticmethod
- def is_active_mode_of_operation(audit):
+ def _get_service_mode():
+ """returns the service_mode as json to be reported by the healthcheck"""
+ return {
+ "is_active_mode_of_operation": ServiceActivator.is_active_mode_of_operation(),
+ "is_pdp_api_default": Config.is_pdp_api_default(log_status=False)
+ }
+
+ @staticmethod
+ def is_active_mode_of_operation(audit=None):
"""
mode_of_operation - whether the service is
active == True or passive == False
based on the current value of the mode_of_operation
+
+ temporary for R4 Dublin - passive for new PDP API
"""
active = (ServiceActivator._mode_of_operation is None
or ServiceActivator._mode_of_operation
== ServiceActivator.MODE_OF_OPERATION_ACTIVE)
- ServiceActivator._logger.info(audit.info(
- "mode_of_operation = {} active = {}".format(
+
+ if active and Config.is_pdp_api_default():
+ active = False
+
+ if audit:
+ _LOGGER.info(audit.info("mode_of_operation = {} active = {}".format(
ServiceActivator._mode_of_operation, active)))
return active
@@ -157,8 +173,7 @@ class ServiceActivator(object):
target_entity = ServiceActivator._target_entity
if not ServiceActivator._url:
- ServiceActivator._logger.info(audit.info(
- "no url found for {}".format(target_entity)))
+ _LOGGER.info(audit.info("no url found for {}".format(target_entity)))
return ServiceActivator.is_active_mode_of_operation(audit)
url = ServiceActivator._url_register
@@ -177,7 +192,7 @@ class ServiceActivator(object):
json.dumps(custom_kwargs))
log_line = log_action + " " + log_data
- ServiceActivator._logger.info(log_line)
+ _LOGGER.info(log_line)
metrics.metrics_start(log_line)
res = None
@@ -190,7 +205,7 @@ class ServiceActivator(object):
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = "failed to {} {}: {} {}".format(
log_action, type(ex).__name__, str(ex), log_data)
- ServiceActivator._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
@@ -204,7 +219,7 @@ class ServiceActivator(object):
metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
- ServiceActivator._logger.error(log_line)
+ _LOGGER.error(log_line)
return ServiceActivator.is_active_mode_of_operation(audit)
result = res.json() or {}
diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py
index 0f4f8e4..5ed4af5 100644
--- a/policyhandler/step_timer.py
+++ b/policyhandler/step_timer.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +14,6 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""periodically callback"""
@@ -22,6 +21,9 @@ import json
from datetime import datetime
from threading import Event, RLock, Thread
+from .utils import Utils
+
+_LOGGER = Utils.get_logger(__file__)
class StepTimer(Thread):
"""call on_time after interval number of seconds, then wait to continue"""
@@ -32,12 +34,11 @@ class StepTimer(Thread):
STATE_STOPPING = "stopping"
STATE_STOPPED = "stopped"
- def __init__(self, name, interval, on_time, logger, *args, **kwargs):
+ def __init__(self, name, interval, on_time, *args, **kwargs):
"""create step timer with controlled start. next step and pause"""
Thread.__init__(self, name=name)
self._interval = interval
self._on_time = on_time
- self._logger = logger
self._args = args
self._kwargs = kwargs
@@ -110,8 +111,8 @@ class StepTimer(Thread):
utcnow = datetime.utcnow()
self._req_time = (utcnow - self._req_ts).total_seconds()
self._req_ts = utcnow
- self._logger.info("{0}[{1}] {2}->{3}".format(
- self.name, self._req_time, prev_req, self.get_timer_status()))
+ _LOGGER.info("{}[{}] {}->{}".format(self.name, self._req_time, prev_req,
+ self.get_timer_status()))
def _log_substep(self, substep):
"""log timer substep"""
@@ -120,7 +121,8 @@ class StepTimer(Thread):
utcnow = datetime.utcnow()
self._substep_time = (utcnow - self._substep_ts).total_seconds()
self._substep_ts = utcnow
- self._logger.info("[{0}] {1}".format(self._substep_time, self.get_timer_status()))
+ _LOGGER.info("{}[{}] {}".format(self.name, self._substep_time,
+ self.get_timer_status()))
def _on_time_event(self):
"""execute the _on_time event"""
@@ -135,7 +137,7 @@ class StepTimer(Thread):
error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})"
.format(self.name, type(ex).__name__, str(ex), "_on_time",
json.dumps(self._args), json.dumps(self._kwargs)))
- self._logger.exception(error_msg)
+ _LOGGER.exception(error_msg)
def run(self):
"""loop one step a time until stopped=finished"""
diff --git a/policyhandler/policy_utils.py b/policyhandler/utils.py
index 08d26f0..d728e48 100644
--- a/policyhandler/policy_utils.py
+++ b/policyhandler/utils.py
@@ -1,5 +1,5 @@
# ================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,121 +14,44 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-"""utils for policy usage and conversions"""
+"""utils and conversions"""
import json
import logging
-import re
+import os
from copy import deepcopy
from typing import Pattern
-from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME,
- POLICY_VERSION)
+class ToBeImplementedException(Exception):
+ """exception for to be implemented features of policy-handler"""
+ pass
-class PolicyUtils(object):
- """policy-client utils"""
- _logger = logging.getLogger("policy_handler.policy_utils")
- _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$')
-
- @staticmethod
- def extract_policy_id(policy_name):
- """ policy_name = policy_id + "." + <version> + "." + <extension>
- For instance,
- policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml
- policy_id = DCAE_alex.Config_alex_policy_number_1
- policy_scope = DCAE_alex
- policy_class = Config
- policy_version = 3
- type = extension = xml
- delimiter = "."
- policy_class_delimiter = "_"
- policy_name in PAP = DCAE_alex.alex_policy_number_1
- """
- if not policy_name:
- return
- return PolicyUtils._policy_name_ext.sub('', policy_name)
-
- @staticmethod
- def parse_policy_config(policy):
- """try parsing the config in policy."""
- if not policy:
- return policy
- config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG)
- if config:
- policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config)
- return policy
-
- @staticmethod
- def convert_to_policy(policy_body):
- """wrap policy_body received from policy-engine with policy_id."""
- if not policy_body:
- return None
- policy_name = policy_body.get(POLICY_NAME)
- policy_version = policy_body.get(POLICY_VERSION)
- if not policy_name or not policy_version:
- return None
- policy_id = PolicyUtils.extract_policy_id(policy_name)
- if not policy_id:
- return None
- return {POLICY_ID:policy_id, POLICY_BODY:policy_body}
-
- @staticmethod
- def select_latest_policy(policy_bodies, expected_versions=None, ignore_policy_names=None):
- """For some reason, the policy-engine returns all version of the policy_bodies.
- DCAE-Controller is only interested in the latest version
- """
- if not policy_bodies:
- return
- latest_policy_body = {}
- for policy_body in policy_bodies:
- policy_name = policy_body.get(POLICY_NAME)
- policy_version = policy_body.get(POLICY_VERSION)
- if not policy_name or not policy_version or not policy_version.isdigit():
- continue
- if expected_versions and policy_version not in expected_versions:
- continue
- if ignore_policy_names and policy_name in ignore_policy_names:
- continue
-
- if (not latest_policy_body
- or int(latest_policy_body[POLICY_VERSION]) < int(policy_version)):
- latest_policy_body = policy_body
-
- return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body))
-
- @staticmethod
- def select_latest_policies(policy_bodies):
- """For some reason, the policy-engine returns all version of the policy_bodies.
- DCAE-Controller is only interested in the latest versions
- """
- if not policy_bodies:
- return {}
- policies = {}
- for policy_body in policy_bodies:
- policy = PolicyUtils.convert_to_policy(policy_body)
- if not policy:
- continue
- policy_id = policy.get(POLICY_ID)
- policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION)
- if not policy_id or not policy_version or not policy_version.isdigit():
- continue
- if (policy_id not in policies
- or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])):
- policies[policy_id] = policy
-
- for policy_id in policies:
- policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id])
-
- return policies
-
class Utils(object):
"""general purpose utils"""
_logger = logging.getLogger("policy_handler.utils")
@staticmethod
+ def get_logger(file_path):
+ """get the logger for the file_path == __file__"""
+ logger_path = []
+ file_path = os.path.realpath(file_path)
+ logger_path.append(os.path.basename(file_path)[:-3])
+ while file_path:
+ file_path = os.path.dirname(file_path)
+ folder_name = os.path.basename(file_path)
+ if folder_name == "policyhandler" or len(logger_path) > 5:
+ break
+ if folder_name == "tests":
+ logger_path.append("unit_test")
+ break
+ logger_path.append(folder_name)
+
+ logger_path.append("policy_handler")
+ return logging.getLogger(".".join(reversed(logger_path)))
+
+ @staticmethod
def safe_json_parse(json_str):
"""try parsing json without exception - returns the json_str back if fails"""
if not json_str:
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index dc76353..dfd1b51 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -14,29 +14,27 @@
# limitations under the License.
# ============LICENSE_END=========================================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""web-server for policy_handler"""
import json
-import logging
from datetime import datetime
import cherrypy
+from . import pdp_client
from .config import Config
from .deploy_handler import PolicyUpdateMessage
from .onap.audit import Audit, AuditHttpCode
-from .policy_matcher import PolicyMatcher
from .policy_receiver import PolicyReceiver
-from .policy_rest import PolicyRest
+from .utils import Utils
class PolicyWeb(object):
"""run http API of policy-handler on 0.0.0.0:wservice_port - any incoming address"""
DATA_NOT_FOUND_ERROR = 404
HOST_INADDR_ANY = ".".join("0"*4)
- logger = logging.getLogger("policy_handler.policy_web")
+ logger = Utils.get_logger(__file__)
@staticmethod
def run_forever(audit):
@@ -84,7 +82,8 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s policy_id=%s headers=%s",
req_info, policy_id, json.dumps(cherrypy.request.headers))
- latest_policy = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {}
+ latest_policy = pdp_client.PolicyRest.get_latest_policy(
+ (audit, policy_id, None, None)) or {}
PolicyWeb.logger.info("res %s policy_id=%s latest_policy=%s",
req_info, policy_id, json.dumps(latest_policy))
@@ -104,9 +103,9 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s", req_info)
- result, policies, policy_filters = PolicyMatcher.get_deployed_policies(audit)
+ result, policies, policy_filters = pdp_client.PolicyMatcher.get_deployed_policies(audit)
if not result:
- result, policy_update = PolicyMatcher.build_catch_up_message(
+ result, policy_update = pdp_client.PolicyMatcher.build_catch_up_message(
audit, policies, policy_filters)
if policy_update and isinstance(policy_update, PolicyUpdateMessage):
result["policy_update"] = policy_update.get_message()
@@ -168,6 +167,9 @@ class _PolicyWeb(object):
}
}
"""
+ if Config.is_pdp_api_default():
+ raise cherrypy.HTTPError(404, "temporarily unsupported due to the new pdp API")
+
if cherrypy.request.method == "GET":
return self._get_all_policies_latest()
@@ -184,7 +186,7 @@ class _PolicyWeb(object):
PolicyWeb.logger.info("%s: policy_filter=%s headers=%s",
req_info, str_policy_filter, json.dumps(cherrypy.request.headers))
- result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {}
+ result = pdp_client.PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {}
result_str = json.dumps(result, sort_keys=True)
PolicyWeb.logger.info("result %s: policy_filter=%s result=%s",