diff options
Diffstat (limited to 'policyhandler')
27 files changed, 1492 insertions, 752 deletions
diff --git a/policyhandler/__main__.py b/policyhandler/__main__.py index 798a2e1..6a71a15 100644 --- a/policyhandler/__main__.py +++ b/policyhandler/__main__.py @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """ run as server: @@ -23,22 +22,23 @@ that will invoke this module __main__.py in folder of policyhandler """ -import logging import sys -from policyhandler import LogWriter from policyhandler.config import Config from policyhandler.onap.audit import Audit -from policyhandler.policy_receiver import PolicyReceiver -from policyhandler.service_activator import ServiceActivator -from policyhandler.web_server import PolicyWeb +from policyhandler.utils import Utils def run_policy_handler(): """main run function for policy-handler""" Config.init_config() - logger = logging.getLogger("policy_handler") + from policyhandler import LogWriter + from policyhandler.policy_receiver import PolicyReceiver + from policyhandler.service_activator import ServiceActivator + from policyhandler.web_server import PolicyWeb + + logger = Utils.get_logger(__file__) sys.stdout = LogWriter(logger.info) sys.stderr = LogWriter(logger.error) diff --git a/policyhandler/config.py b/policyhandler/config.py index e6e74cc..f8c425a 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """read and use the config""" @@ -25,7 +24,7 @@ import logging.config import os from .onap.audit import Audit -from .policy_utils import Utils +from .utils import Utils LOGS_DIR = 'logs' @@ -40,6 +39,8 @@ logging.basicConfig( '%(threadName)s %(name)s.%(funcName)s: %(message)s'), datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG) +_LOGGER = Utils.get_logger(__file__) + class Settings(object): """settings of module or an application that is the config filtered by the collection of config-keys. @@ -127,7 +128,6 @@ class Settings(object): class Config(object): """main config of the application""" - _logger = logging.getLogger("policy_handler.config") CONFIG_FILE_PATH = "etc/config.json" LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config" SERVICE_NAME_POLICY_HANDLER = "policy_handler" @@ -154,9 +154,11 @@ class Config(object): DEFAULT_TIMEOUT_IN_SECS = 60 SERVICE_ACTIVATOR = "service_activator" MODE_OF_OPERATION = "mode_of_operation" + PDP_API_VERSION = "PDP_API_VERSION" system_name = SERVICE_NAME_POLICY_HANDLER wservice_port = 25577 + _pdp_api_version = os.environ.get(PDP_API_VERSION) consul_url = "http://consul:8500" consul_timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS tls_cacert_file = None @@ -175,7 +177,7 @@ class Config(object): return None tls_file_path = os.path.join(cert_directory, file_name) if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK): - Config._logger.error("invalid %s: %s", tls_name, tls_file_path) + _LOGGER.error("invalid %s: %s", tls_name, tls_file_path) return None return tls_file_path @@ -189,19 +191,19 @@ class Config(object): Config.tls_server_ca_chain_file = None if not (tls_config and isinstance(tls_config, dict)): - Config._logger.info("no tls in config: %s", json.dumps(tls_config)) + _LOGGER.info("no tls in config: %s", json.dumps(tls_config)) return cert_directory = tls_config.get("cert_directory") if not (cert_directory and isinstance(cert_directory, str)): - Config._logger.warning("unexpected tls.cert_directory: %r", cert_directory) + _LOGGER.warning("unexpected tls.cert_directory: %r", cert_directory) return cert_directory = os.path.join( os.path.dirname(os.path.dirname(os.path.realpath(__file__))), cert_directory) if not (cert_directory and os.path.isdir(cert_directory)): - Config._logger.warning("ignoring invalid cert_directory: %s", cert_directory) + _LOGGER.warning("ignoring invalid cert_directory: %s", cert_directory) return Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert") @@ -213,16 +215,16 @@ class Config(object): "server_ca_chain") finally: - Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file) - Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file) - Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file) - Config._logger.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file) + _LOGGER.info("tls_cacert_file = %s", Config.tls_cacert_file) + _LOGGER.info("tls_server_cert_file = %s", Config.tls_server_cert_file) + _LOGGER.info("tls_private_key_file = %s", Config.tls_private_key_file) + _LOGGER.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file) @staticmethod def init_config(file_path=None): """read and store the config from config file""" if Config._local_config.is_loaded(): - Config._logger.info("config already inited: %s", Config._local_config) + _LOGGER.info("config already inited: %s", Config._local_config) return if not file_path: @@ -234,10 +236,10 @@ class Config(object): loaded_config = json.load(config_json) if not loaded_config: - Config._logger.warning("config not loaded from file: %s", file_path) + _LOGGER.warning("config not loaded from file: %s", file_path) return - Config._logger.info("config loaded from file: %s", file_path) + _LOGGER.info("config loaded from file(%s): %s", file_path, Audit.json_dumps(loaded_config)) logging_config = loaded_config.get("logging") if logging_config: logging.config.dictConfig(logging_config) @@ -249,13 +251,15 @@ class Config(object): if not Config.consul_timeout_in_secs or Config.consul_timeout_in_secs < 1: Config.consul_timeout_in_secs = Config.DEFAULT_TIMEOUT_IN_SECS + Config._pdp_api_version = os.environ.get( + Config.PDP_API_VERSION, loaded_config.get(Config.PDP_API_VERSION.lower())) + local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {}) Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name) Config._set_tls_config(local_config.get(Config.FIELD_TLS)) Config._local_config.set_config(local_config, auto_commit=True) - Config._logger.info("config loaded from file(%s): %s", file_path, Config._local_config) @staticmethod def discover(audit): @@ -265,14 +269,14 @@ class Config(object): new_config = DiscoveryClient.get_value(audit, discovery_key) if not new_config or not isinstance(new_config, dict): - Config._logger.warning("unexpected config from discovery: %s", new_config) + _LOGGER.warning("unexpected config from discovery: %s", new_config) return - Config._logger.debug("loaded config from discovery(%s): %s", - discovery_key, Audit.json_dumps(new_config)) + _LOGGER.debug("loaded config from discovery(%s): %s", + discovery_key, Audit.json_dumps(new_config)) Config.discovered_config.set_config(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) - Config._logger.info("config from discovery: %s", Config.discovered_config) + _LOGGER.info("config from discovery: %s", Config.discovered_config) @staticmethod @@ -302,3 +306,11 @@ class Config(object): def get_requests_kwargs(tls_ca_mode=None): """generate kwargs with verify for requests based on the tls_ca_mode""" return {Config.REQUESTS_VERIFY: Config.get_tls_verify(tls_ca_mode)} + + @staticmethod + def is_pdp_api_default(log_status=True): + """whether to use the old (2018) or the default pdp API (started in 2019)""" + is_default = (Config._pdp_api_version is None) + if log_status: + _LOGGER.info("_pdp_api_version(%s) default(%s)", Config._pdp_api_version, is_default) + return is_default diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 0ffacba..a127e54 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,12 +14,10 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """send policy-update notification to deployment-handler""" import json -import logging from copy import copy, deepcopy from threading import Lock @@ -32,7 +30,9 @@ from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES, POLICY_FILTER_MATCHES, POLICY_FILTERS, REMOVED_POLICIES, TARGET_ENTITY) +from .utils import Utils +_LOGGER = Utils.get_logger(__file__) class PolicyUpdateMessage(object): """class for messages to deployment-handler on policy-update""" @@ -143,7 +143,6 @@ class PolicyUpdateMessage(object): class DeployHandler(object): """calling the deployment-handler web apis""" - _logger = logging.getLogger("policy_handler.deploy_handler") DEFAULT_TARGET_ENTITY = "deployment_handler" DEFAULT_TIMEOUT_IN_SECS = 60 @@ -202,7 +201,7 @@ class DeployHandler(object): tls_ca_mode = config_dh.get(Config.TLS_CA_MODE) DeployHandler._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) - DeployHandler._logger.info( + _LOGGER.info( "dns based routing to %s: url(%s) tls_ca_mode(%s) custom_kwargs(%s)", DeployHandler._target_entity, DeployHandler._url, tls_ca_mode, json.dumps(DeployHandler._custom_kwargs)) @@ -221,8 +220,8 @@ class DeployHandler(object): DeployHandler._target_entity) DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy' - DeployHandler._logger.info("got %s policy url(%s): %s", DeployHandler._target_entity, - DeployHandler._url_policy, DeployHandler._settings) + _LOGGER.info("got %s policy url(%s): %s", DeployHandler._target_entity, + DeployHandler._url_policy, DeployHandler._settings) DeployHandler._settings.commit_change() DeployHandler._lazy_inited = bool(DeployHandler._url) @@ -310,12 +309,12 @@ class DeployHandler(object): json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs)) log_line = log_action + " " + log_data - DeployHandler._logger.info(log_line) + _LOGGER.info(log_line) metrics.metrics_start(log_line) if not DeployHandler._url: error_msg = "no url found to {0}".format(log_line) - DeployHandler._logger.error(error_msg) + _LOGGER.error(error_msg) metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) metrics.metrics(error_msg) @@ -331,7 +330,7 @@ class DeployHandler(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = "failed to {} {}: {} {}".format( log_action, type(ex).__name__, str(ex), log_data) - DeployHandler._logger.exception(error_msg) + _LOGGER.exception(error_msg) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) @@ -345,10 +344,10 @@ class DeployHandler(object): metrics.metrics(log_line) if res.status_code != requests.codes.ok: - DeployHandler._logger.error(log_line) + _LOGGER.error(log_line) return - DeployHandler._logger.info(log_line) + _LOGGER.info(log_line) result = res.json() or {} DeployHandler._server_instance_changed(result, metrics) @@ -379,12 +378,12 @@ class DeployHandler(object): json.dumps(headers), json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs)) log_line = log_action + " " + log_data - DeployHandler._logger.info(log_line) + _LOGGER.info(log_line) metrics.metrics_start(log_line) if not DeployHandler._url: error_msg = "no url found to {}".format(log_line) - DeployHandler._logger.error(error_msg) + _LOGGER.error(error_msg) metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) metrics.metrics(error_msg) @@ -400,7 +399,7 @@ class DeployHandler(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = "failed to {} {}: {} {}".format( log_action, type(ex).__name__, str(ex), log_data) - DeployHandler._logger.exception(error_msg) + _LOGGER.exception(error_msg) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) @@ -414,7 +413,7 @@ class DeployHandler(object): metrics.metrics(log_line) if res.status_code != requests.codes.ok: - DeployHandler._logger.error(log_line) + _LOGGER.error(log_line) return None, None result = res.json() or {} @@ -424,12 +423,12 @@ class DeployHandler(object): policy_filters = result.get(POLICY_FILTERS, {}) if not policies and not policy_filters: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) - DeployHandler._logger.warning(audit.warn( + _LOGGER.warning(audit.warn( "found no deployed policies or policy-filters: {}".format(log_line), error_code=AuditResponseCode.DATA_ERROR)) return policies, policy_filters - DeployHandler._logger.info(log_line) + _LOGGER.info(log_line) return policies, policy_filters @staticmethod @@ -442,6 +441,6 @@ class DeployHandler(object): and prev_server_instance_uuid != DeployHandler._server_instance_uuid): DeployHandler.server_instance_changed = True - DeployHandler._logger.info(metrics.info( + _LOGGER.info(metrics.info( "deployment_handler_changed: {1} != {0}" .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))) diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py index 4c5b64e..83f54ac 100644 --- a/policyhandler/discovery.py +++ b/policyhandler/discovery.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,20 +14,20 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """client to talk to consul services and kv""" import base64 import json -import logging import requests from .config import Config from .customize import CustomizerUser from .onap.audit import AuditHttpCode, Metrics +from .utils import Utils +_LOGGER = Utils.get_logger(__file__) class DiscoveryClient(object): """talking to consul at Config.consul_url @@ -51,13 +51,12 @@ class DiscoveryClient(object): CONSUL_ENTITY = "consul" CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}" CONSUL_KV_MASK = "{}/v1/kv/{}" - _logger = logging.getLogger("policy_handler.discovery") @staticmethod def _discover_service(audit, service_name, service_path): """find the service record in consul""" response = requests.get(service_path, timeout=Config.consul_timeout_in_secs) - DiscoveryClient._logger.info(audit.info("response {} from {}: {}".format( + _LOGGER.info(audit.info("response {} from {}: {}".format( response.status_code, service_path, response.text))) response.raise_for_status() @@ -75,7 +74,7 @@ class DiscoveryClient(object): log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, service_path) - DiscoveryClient._logger.info(metrics.metrics_start(log_line)) + _LOGGER.info(metrics.metrics_start(log_line)) status_code = None try: (status_code, @@ -86,7 +85,7 @@ class DiscoveryClient(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line, type(ex).__name__, str(ex))) - DiscoveryClient._logger.exception(error_msg) + _LOGGER.exception(error_msg) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) @@ -95,15 +94,14 @@ class DiscoveryClient(object): if not service_url: error_code = AuditHttpCode.DATA_ERROR.value error_msg = "failed {}/{} to {}".format(status_code, error_code, log_line) - DiscoveryClient._logger.error(audit.error(error_msg)) + _LOGGER.error(audit.error(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) return None log_line = "response {} {}".format(status_code, log_line) - DiscoveryClient._logger.info(audit.info("got service_url: {} after {}" - .format(service_url, log_line))) + _LOGGER.info(audit.info("got service_url: {} after {}".format(service_url, log_line))) metrics.set_http_status_code(status_code) audit.set_http_status_code(status_code) @@ -128,7 +126,7 @@ class DiscoveryClient(object): log_line = "get from {} at {}".format(DiscoveryClient.CONSUL_ENTITY, discovery_url) - DiscoveryClient._logger.info(metrics.metrics_start(log_line)) + _LOGGER.info(metrics.metrics_start(log_line)) status_code = None try: status_code, value = DiscoveryClient._get_value_from_kv(discovery_url) @@ -138,7 +136,7 @@ class DiscoveryClient(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line, type(ex).__name__, str(ex))) - DiscoveryClient._logger.exception(error_msg) + _LOGGER.exception(error_msg) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index db4498a..3c09c16 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """generic class to keep track of request handling from receiving it through reponse and log all the activities @@ -68,6 +67,7 @@ class AuditHttpCode(Enum): PERMISSION_UNAUTHORIZED_ERROR = 401 PERMISSION_FORBIDDEN_ERROR = 403 RESPONSE_ERROR = 400 + PAGE_NOT_FOUND_ERROR = 404 SERVER_INTERNAL_ERROR = 500 SERVICE_UNAVAILABLE_ERROR = 503 DATA_ERROR = 1030 @@ -94,7 +94,8 @@ class AuditResponseCode(Enum): elif http_status_code in [AuditHttpCode.PERMISSION_UNAUTHORIZED_ERROR.value, AuditHttpCode.PERMISSION_FORBIDDEN_ERROR.value]: response_code = AuditResponseCode.PERMISSION_ERROR - elif http_status_code == AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value: + elif http_status_code in [AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value, + AuditHttpCode.PAGE_NOT_FOUND_ERROR.value]: response_code = AuditResponseCode.AVAILABILITY_ERROR elif http_status_code == AuditHttpCode.SERVER_INTERNAL_ERROR.value: response_code = AuditResponseCode.BUSINESS_PROCESS_ERROR @@ -125,9 +126,9 @@ class _Audit(object): :kwargs: - put any request related params into kwargs """ - _service_name = "" + SERVICE_INSTANCE_UUID = str(uuid.uuid4()) + service_name = "" _service_version = "" - _service_instance_uuid = str(uuid.uuid4()) _started = datetime.utcnow() _key_format = re.compile(r"\W") _logger_debug = None @@ -144,15 +145,15 @@ class _Audit(object): @staticmethod def init(service_name, config_file_path): """init static invariants and loggers""" - _Audit._service_name = service_name + _Audit.service_name = service_name _Audit._logger_debug = CommonLogger(config_file_path, "debug", \ - instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + instanceUUID=_Audit.SERVICE_INSTANCE_UUID, serviceName=_Audit.service_name) _Audit._logger_error = CommonLogger(config_file_path, "error", \ - instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + instanceUUID=_Audit.SERVICE_INSTANCE_UUID, serviceName=_Audit.service_name) _Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \ - instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + instanceUUID=_Audit.SERVICE_INSTANCE_UUID, serviceName=_Audit.service_name) _Audit._logger_audit = CommonLogger(config_file_path, "audit", \ - instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name) + instanceUUID=_Audit.SERVICE_INSTANCE_UUID, serviceName=_Audit.service_name) ProcessInfo.init() try: _Audit._service_version = subprocess.check_output( @@ -175,7 +176,7 @@ class _Audit(object): :req_message: is the request message string for logging :kwargs: - put any request related params into kwargs """ - self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name) + self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit.service_name) self.request_id = request_id self.req_message = req_message or "" self.kwargs = kwargs or {} @@ -200,9 +201,9 @@ class _Audit(object): utcnow = datetime.utcnow() health = { "server" : { - "service_name" : _Audit._service_name, + "service_name" : _Audit.service_name, "service_version" : _Audit._service_version, - "service_instance_uuid" : _Audit._service_instance_uuid + "service_instance_uuid" : _Audit.SERVICE_INSTANCE_UUID }, "runtime" : { "started" : str(_Audit._started), @@ -214,11 +215,12 @@ class _Audit(object): "process_memory" : ProcessInfo.process_memory() }, "stats" : _Audit._health.dump(), - "items" : dict((health_name, health_getter()) - for health_name, health_getter in _Audit._health_checkers.items()), "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages} } - self.info("{} health: {}".format(_Audit._service_name, + health.update(dict((health_name, health_getter()) + for health_name, health_getter in _Audit._health_checkers.items()) + ) + self.info("{} health: {}".format(_Audit.service_name, json.dumps(health, sort_keys=True))) return health @@ -226,7 +228,7 @@ class _Audit(object): def process_info(self): """get the debug info on all the threads and memory""" process_info = ProcessInfo.get_all() - self.info("{} process_info: {}".format(_Audit._service_name, json.dumps(process_info))) + self.info("{} process_info: {}".format(_Audit.service_name, json.dumps(process_info))) return process_info diff --git a/policyhandler/pdp_api/__init__.py b/policyhandler/pdp_api/__init__.py new file mode 100644 index 0000000..4d009ed --- /dev/null +++ b/policyhandler/pdp_api/__init__.py @@ -0,0 +1,30 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""2019 http api to policy-engine https://<policy-engine>:<port>/decision/v1/ POST""" + +from .policy_matcher import PolicyMatcher +from .policy_rest import PolicyRest +from .policy_listener import PolicyListener +from .policy_updates import PolicyUpdates + +def get_pdp_api_info(): + """info on which version of pdp api is in effect""" + return ("folders: PolicyMatcher({}), PolicyRest({}), PolicyListener({}), PolicyUpdates({})" + .format(PolicyMatcher.PDP_API_FOLDER, PolicyRest.PDP_API_FOLDER, + PolicyListener.PDP_API_FOLDER, PolicyUpdates.PDP_API_FOLDER + )) diff --git a/policyhandler/pdp_api/pdp_consts.py b/policyhandler/pdp_api/pdp_consts.py new file mode 100644 index 0000000..2337456 --- /dev/null +++ b/policyhandler/pdp_api/pdp_consts.py @@ -0,0 +1,35 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +"""contants of PDP""" + +# fields from pdp API 2018 +POLICY_NAME = "policyName" +POLICY_VERSION = "policyVersion" +POLICY_CONFIG = 'config' + +# fields from pdp API 2019 +PDP_POLICIES = 'policies' +PDP_PROPERTIES = 'properties' +PDP_METADATA = 'metadata' +PDP_POLICY_ID = 'policy-id' +PDP_POLICY_VERSION = 'policy-version' + +# req to PDP +PDP_REQ_ONAP_NAME = "ONAPName" # always "DCAE" +PDP_REQ_ONAP_COMPONENT = "ONAPComponent" +PDP_REQ_ONAP_INSTANCE = "ONAPInstance" +PDP_REQ_RESOURCE = "resource" diff --git a/policyhandler/pdp_api/policy_listener.py b/policyhandler/pdp_api/policy_listener.py new file mode 100644 index 0000000..9fa4695 --- /dev/null +++ b/policyhandler/pdp_api/policy_listener.py @@ -0,0 +1,55 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +""" +policy-listener communicates with policy-engine +to receive push notifications +on updates and removal of policies. + +on receiving the policy-notifications, the policy-receiver +passes the notifications to policy-updater +""" + +import os + +from ..utils import ToBeImplementedException, Utils + +_LOGGER = Utils.get_logger(__file__) + +class PolicyListener(object): + """listener to PolicyEngine""" + PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) + + def __init__(self, *_): + """listener to receive the policy notifications from PolicyEngine""" + _LOGGER.info("to_be_implemented") + raise ToBeImplementedException() + + def reconfigure(self, _): + """configure and reconfigure the listener""" + _LOGGER.info("to_be_implemented") + raise ToBeImplementedException() + + def run(self): + """listen on web-socket and pass the policy notifications to policy-updater""" + _LOGGER.info("to_be_implemented") + raise ToBeImplementedException() + + def shutdown(self, _): + """Shutdown the policy-listener""" + _LOGGER.info("to_be_implemented") + raise ToBeImplementedException() diff --git a/policyhandler/pdp_api/policy_matcher.py b/policyhandler/pdp_api/policy_matcher.py new file mode 100644 index 0000000..57258c3 --- /dev/null +++ b/policyhandler/pdp_api/policy_matcher.py @@ -0,0 +1,25 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""policy-matcher matches the policies from deployment-handler to policies from policy-engine""" + +import os + + +class PolicyMatcher(object): + """policy-matcher - static class""" + PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) diff --git a/policyhandler/pdp_api/policy_rest.py b/policyhandler/pdp_api/policy_rest.py new file mode 100644 index 0000000..14d9296 --- /dev/null +++ b/policyhandler/pdp_api/policy_rest.py @@ -0,0 +1,215 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""policy-client communicates with policy-engine thru REST API""" + +import copy +import json +import os +import urllib.parse +from threading import Lock + +import requests + +from ..config import Config, Settings +from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, + AuditResponseCode, Metrics) +from ..utils import Utils +from .pdp_consts import PDP_POLICIES +from .policy_utils import PolicyUtils + +_LOGGER = Utils.get_logger(__file__) + +class PolicyRest(object): + """using the http API to policy-engine""" + PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) + _lazy_inited = False + DEFAULT_TIMEOUT_IN_SECS = 60 + + _lock = Lock() + _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS) + + _target_entity = None + _requests_session = None + _url = None + _url_pdp_decision = None + _headers = None + _custom_kwargs = {} + _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS + + @staticmethod + def _init(): + """init static config""" + PolicyRest._custom_kwargs = {} + tls_ca_mode = None + + if not PolicyRest._requests_session: + PolicyRest._requests_session = requests.Session() + + changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20) + if changed: + PolicyRest._requests_session.mount( + 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) + PolicyRest._requests_session.mount( + 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) + + _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) + if config: + PolicyRest._url = config.get("url") + if PolicyRest._url: + PolicyRest._url_pdp_decision = urllib.parse.urljoin( + PolicyRest._url, config.get("path_decision", "/decision/v1/")) + PolicyRest._headers = config.get("headers", {}) + PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) + + tls_ca_mode = config.get(Config.TLS_CA_MODE) + PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) + PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS) + if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1: + PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS + + _LOGGER.info( + "PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s", + PolicyRest._target_entity, PolicyRest._url_pdp_decision, + Metrics.json_dumps(PolicyRest._headers), tls_ca_mode, + PolicyRest._timeout_in_secs, json.dumps(PolicyRest._custom_kwargs), + PolicyRest._settings) + + PolicyRest._settings.commit_change() + PolicyRest._lazy_inited = True + + @staticmethod + def reconfigure(): + """reconfigure""" + with PolicyRest._lock: + PolicyRest._settings.set_config(Config.discovered_config) + if not PolicyRest._settings.is_changed(): + PolicyRest._settings.commit_change() + return False + + PolicyRest._lazy_inited = False + PolicyRest._init() + return True + + @staticmethod + def _lazy_init(): + """init static config""" + if PolicyRest._lazy_inited: + return + + with PolicyRest._lock: + if PolicyRest._lazy_inited: + return + + PolicyRest._settings.set_config(Config.discovered_config) + PolicyRest._init() + + @staticmethod + def _pdp_get_decision(audit, pdp_req): + """Communication with the policy-engine""" + if not PolicyRest._url: + _LOGGER.error( + audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR)) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None + + with PolicyRest._lock: + session = PolicyRest._requests_session + target_entity = PolicyRest._target_entity + url = PolicyRest._url_pdp_decision + timeout_in_secs = PolicyRest._timeout_in_secs + headers = copy.deepcopy(PolicyRest._headers) + custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs) + + metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url) + + headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id + + log_action = "post to {} at {}".format(target_entity, url) + log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format( + json.dumps(pdp_req), Metrics.json_dumps(headers), json.dumps(custom_kwargs), + timeout_in_secs) + log_line = log_action + " " + log_data + + _LOGGER.info(metrics.metrics_start(log_line)) + + res = None + try: + res = session.post(url, json=pdp_req, headers=headers, timeout=timeout_in_secs, + **custom_kwargs) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) + + _LOGGER.exception(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None + + log_line = "response {} from {}: text={} headers={}".format( + res.status_code, log_line, res.text, + Metrics.json_dumps(dict(res.request.headers.items()))) + + _LOGGER.info(log_line) + metrics.set_http_status_code(res.status_code) + audit.set_http_status_code(res.status_code) + metrics.metrics(log_line) + + policy_bodies = None + if res.status_code == requests.codes.ok: + policy_bodies = res.json().get(PDP_POLICIES) + + return policy_bodies + + @staticmethod + def get_latest_policy(aud_policy_id): + """safely try retrieving the latest policy for the policy_id from the policy-engine""" + audit, policy_id, _, _ = aud_policy_id + try: + PolicyRest._lazy_init() + + pdp_req = PolicyUtils.gen_req_to_pdp(policy_id) + policy_bodies = PolicyRest._pdp_get_decision(audit, pdp_req) + + log_line = "looking for policy_id({}) in policy_bodies: {}".format( + policy_id, json.dumps(policy_bodies)) + _LOGGER.info(log_line) + + latest_policy = None + if policy_bodies and policy_id in policy_bodies: + latest_policy = PolicyUtils.convert_to_policy(policy_bodies[policy_id]) + + if not PolicyUtils.validate_policy(latest_policy): + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) + _LOGGER.error(audit.error( + "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), + error_code=AuditResponseCode.DATA_ERROR)) + + return latest_policy + except Exception as ex: + error_msg = ("{}: get_latest_policy({}) crash {}: {}" + .format(audit.request_id, policy_id, type(ex).__name__, str(ex))) + + _LOGGER.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None diff --git a/policyhandler/pdp_api/policy_updates.py b/policyhandler/pdp_api/policy_updates.py new file mode 100644 index 0000000..eb3c3d1 --- /dev/null +++ b/policyhandler/pdp_api/policy_updates.py @@ -0,0 +1,49 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""policy-updates accumulates the policy-update notifications from PDP""" + +import os + +from ..utils import Utils, ToBeImplementedException + + +_LOGGER = Utils.get_logger(__file__) + +class PolicyUpdates(object): + """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)""" + PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) + + def __init__(self): + """init and reset""" + + def reset(self): + """resets the state""" + self.__init__() + + def pop_policy_updates(self): + """ + Returns the consolidated (audit, policies_updated, policies_removed) + and resets the state + """ + _LOGGER.info("to_be_implemented") + return None, None, None + + def push_policy_updates(self, *_): + """consolidate the new policies_updated, policies_removed to existing ones""" + _LOGGER.info("to_be_implemented") + raise ToBeImplementedException() diff --git a/policyhandler/pdp_api/policy_utils.py b/policyhandler/pdp_api/policy_utils.py new file mode 100644 index 0000000..1d06d14 --- /dev/null +++ b/policyhandler/pdp_api/policy_utils.py @@ -0,0 +1,123 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""utils for policy usage and conversions""" + +from ..onap.audit import Audit +from ..policy_consts import POLICY_BODY, POLICY_ID +from .pdp_consts import (PDP_METADATA, PDP_POLICY_ID, + PDP_POLICY_VERSION, PDP_PROPERTIES, + PDP_REQ_ONAP_COMPONENT, PDP_REQ_ONAP_INSTANCE, + PDP_REQ_ONAP_NAME, PDP_REQ_RESOURCE, POLICY_CONFIG, + POLICY_NAME, POLICY_VERSION) + + +class PolicyUtils(object): + """policy-client utils""" + + @staticmethod + def gen_req_to_pdp(policy_id): + """request to get a single policy from pdp by policy_id""" + return { + PDP_REQ_ONAP_NAME: "DCAE", + PDP_REQ_ONAP_COMPONENT: Audit.service_name, + PDP_REQ_ONAP_INSTANCE: Audit.SERVICE_INSTANCE_UUID, + "action": "configure", + PDP_REQ_RESOURCE: {PDP_POLICY_ID: [policy_id]} + } + + @staticmethod + def convert_to_policy(policy_body): + """ + set policy id, name, version, config=properties and + wrap policy_body received from policy-engine with policy_id + + input: + { + "type": "onap.policies.monitoring.cdap.tca.hi.lo.app", + "version": "1.0.0", + "metadata": { + "policy-id": "onap.scaleout.tca", + "policy-version": 1, + "description": "The scaleout policy for vDNS" + }, + "properties": { + "tca_policy": { + "domain": "measurementsForVfScaling", + "metricsPerEventName": [ + { + "eventName": "vLoadBalancer", + "controlLoopSchemaType": "VNF", + "policyScope": "type=configuration" + } + ] + } + } + } + + output: + { + "policy_id": "onap.scaleout.tca", + "policy_body": { + "policyName": "onap.scaleout.tca.1.xml", + "policyVersion": 1, + "type": "onap.policies.monitoring.cdap.tca.hi.lo.app", + "version": "1.0.0", + "metadata": { + "policy-id": "onap.scaleout.tca", + "policy-version": 1, + "description": "The scaleout policy for vDNS" + }, + "config": { + "tca_policy": { + "domain": "measurementsForVfScaling", + "metricsPerEventName": [ + { + "eventName": "vLoadBalancer", + "controlLoopSchemaType": "VNF", + "policyScope": "type=configuration" + } + ] + } + } + } + } + """ + if not policy_body or not policy_body.get(PDP_PROPERTIES): + return None + + pdp_metadata = policy_body.get(PDP_METADATA, {}) + policy_id = pdp_metadata.get(PDP_POLICY_ID) + policy_version = pdp_metadata.get(PDP_POLICY_VERSION) + if not policy_id or not policy_version: + return None + + policy_body[POLICY_NAME] = "{}.{}.xml".format(policy_id, policy_version) + policy_body[POLICY_VERSION] = str(policy_version) + policy_body[POLICY_CONFIG] = policy_body[PDP_PROPERTIES] + del policy_body[PDP_PROPERTIES] + + return {POLICY_ID:policy_id, POLICY_BODY:policy_body} + + @staticmethod + def validate_policy(policy): + """validate have non-empty config in policy""" + if not policy: + return False + + policy_body = policy.get(POLICY_BODY) + return bool(policy_body and policy_body.get(POLICY_CONFIG)) diff --git a/policyhandler/pdp_api_v0/__init__.py b/policyhandler/pdp_api_v0/__init__.py new file mode 100644 index 0000000..0196508 --- /dev/null +++ b/policyhandler/pdp_api_v0/__init__.py @@ -0,0 +1,30 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""<=2018 http api to policy-engine /getConfig that is going to be replaced in 2019""" + +from .policy_matcher import PolicyMatcher +from .policy_rest import PolicyRest +from .policy_listener import PolicyListener +from .policy_updates import PolicyUpdates + +def get_pdp_api_info(): + """info on which version of pdp api is in effect""" + return ("folders: PolicyMatcher({}), PolicyRest({}), PolicyListener({}), PolicyUpdates({})" + .format(PolicyMatcher.PDP_API_FOLDER, PolicyRest.PDP_API_FOLDER, + PolicyListener.PDP_API_FOLDER, PolicyUpdates.PDP_API_FOLDER + )) diff --git a/policyhandler/pdp_api_v0/pdp_consts.py b/policyhandler/pdp_api_v0/pdp_consts.py new file mode 100644 index 0000000..d1c0b44 --- /dev/null +++ b/policyhandler/pdp_api_v0/pdp_consts.py @@ -0,0 +1,23 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""contants of PDP""" + +POLICY_VERSION = "policyVersion" +POLICY_NAME = "policyName" +POLICY_CONFIG = 'config' +MATCHING_CONDITIONS = "matchingConditions" diff --git a/policyhandler/pdp_api_v0/policy_listener.py b/policyhandler/pdp_api_v0/policy_listener.py new file mode 100644 index 0000000..67e4c49 --- /dev/null +++ b/policyhandler/pdp_api_v0/policy_listener.py @@ -0,0 +1,309 @@ +# ================================================================================ +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +""" +policy-listener communicates with policy-engine +thru web-socket to receive push notifications +on updates and removal of policies. + +on receiving the policy-notifications, the policy-receiver +passes the notifications to policy-updater +""" + +import copy +import json +import os +import ssl +import time +import urllib.parse +from datetime import datetime +from threading import Lock, Thread + +import websocket + +from ..config import Config, Settings +from ..onap.audit import Audit +from ..utils import Utils +from .pdp_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION + +LOADED_POLICIES = 'loadedPolicies' +REMOVED_POLICIES = 'removedPolicies' +POLICY_VER = 'versionNo' +POLICY_MATCHES = 'matches' + +_LOGGER = Utils.get_logger(__file__) + +class PolicyListener(Thread): + """web-socket to PolicyEngine""" + PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) + WS_STARTED = "started" + WS_START_COUNT = "start_count" + WS_CLOSE_COUNT = "close_count" + WS_ERROR_COUNT = "error_count" + WS_PONG_COUNT = "pong_count" + WS_MESSAGE_COUNT = "message_count" + WS_MESSAGE_TIMESTAMP = "message_timestamp" + WS_STATUS = "status" + WS_PING_INTERVAL_DEFAULT = 30 + WEB_SOCKET_HEALTH = "web_socket_health" + + def __init__(self, audit, policy_updater): + """web-socket inside the thread to receive policy notifications from PolicyEngine""" + Thread.__init__(self, name="policy_receiver", daemon=True) + + self._policy_updater = policy_updater + self._lock = Lock() + self._keep_running = True + self._settings = Settings(Config.FIELD_POLICY_ENGINE) + + self._sleep_before_restarting = 5 + self._web_socket_url = None + self._web_socket_sslopt = None + self._tls_wss_ca_mode = None + self._web_socket = None + self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT + self._web_socket_health = { + PolicyListener.WS_START_COUNT: 0, + PolicyListener.WS_CLOSE_COUNT: 0, + PolicyListener.WS_ERROR_COUNT: 0, + PolicyListener.WS_PONG_COUNT: 0, + PolicyListener.WS_MESSAGE_COUNT: 0, + PolicyListener.WS_STATUS: "created" + } + + Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH, self._get_health) + self.reconfigure(audit) + + def reconfigure(self, audit): + """configure and reconfigure the web-socket""" + with self._lock: + _LOGGER.info(audit.info("web_socket_health {}".format( + json.dumps(self._get_health(), sort_keys=True)))) + self._sleep_before_restarting = 5 + self._settings.set_config(Config.discovered_config) + changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE) + + if not changed: + self._settings.commit_change() + return False + + prev_web_socket_url = self._web_socket_url + prev_web_socket_sslopt = self._web_socket_sslopt + prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs + + self._web_socket_sslopt = None + + resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/", + config.get("path_notifications", "/pdp/notifications")) + + self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE) + + self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS) + if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60: + self._ws_ping_interval_in_secs = PolicyListener.WS_PING_INTERVAL_DEFAULT + + if resturl.startswith("https:"): + self._web_socket_url = resturl.replace("https:", "wss:") + + verify = Config.get_tls_verify(self._tls_wss_ca_mode) + if verify is False: + self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE} + elif verify is True: + pass + else: + self._web_socket_sslopt = {'ca_certs': verify} + + else: + self._web_socket_url = resturl.replace("http:", "ws:") + + log_changed = ( + "changed web_socket_url(%s) or tls_wss_ca_mode(%s)" + " or ws_ping_interval_in_secs(%s): %s" % + (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs, + self._settings)) + if (self._web_socket_url == prev_web_socket_url + and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt) + and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs): + _LOGGER.info(audit.info("not {}".format(log_changed))) + self._settings.commit_change() + return False + + _LOGGER.info(audit.info(log_changed)) + self._settings.commit_change() + + self._stop_notifications() + return True + + def run(self): + """listen on web-socket and pass the policy notifications to policy-updater""" + _LOGGER.info("starting policy_receiver...") + websocket.enableTrace(True) + restarting = False + while True: + if not self._get_keep_running(): + break + + self._stop_notifications() + + if restarting: + with self._lock: + sleep_before_restarting = self._sleep_before_restarting + _LOGGER.info( + "going to sleep for %s secs before restarting policy-notifications", + sleep_before_restarting) + + time.sleep(sleep_before_restarting) + if not self._get_keep_running(): + break + + with self._lock: + web_socket_url = self._web_socket_url + sslopt = copy.deepcopy(self._web_socket_sslopt) + tls_wss_ca_mode = self._tls_wss_ca_mode + ws_ping_interval_in_secs = self._ws_ping_interval_in_secs + + _LOGGER.info( + "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)" + " ws_ping_interval_in_secs(%s)", + web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs) + + self._web_socket = websocket.WebSocketApp( + web_socket_url, + on_open=self._on_ws_open, + on_message=self._on_pdp_message, + on_close=self._on_ws_close, + on_error=self._on_ws_error, + on_pong=self._on_ws_pong + ) + + _LOGGER.info("waiting for policy-notifications...") + self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs) + restarting = True + + Audit.register_item_health(PolicyListener.WEB_SOCKET_HEALTH) + _LOGGER.info("exit policy-receiver") + + def _get_keep_running(self): + """thread-safe check whether to continue running""" + with self._lock: + keep_running = self._keep_running + return keep_running + + def _stop_notifications(self): + """close the web-socket == stops the notification service if running.""" + with self._lock: + if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: + self._web_socket.close() + _LOGGER.info("stopped receiving notifications from PDP") + + def _on_pdp_message(self, *args): + """received the notification from PDP""" + self._web_socket_health[PolicyListener.WS_MESSAGE_COUNT] += 1 + self._web_socket_health[PolicyListener.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow()) + try: + message = args and args[-1] + _LOGGER.info("Received notification message: %s", message) + _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True)) + if not message: + return + message = json.loads(message) + + if not message or not isinstance(message, dict): + _LOGGER.warning("unexpected message from PDP: %s", json.dumps(message)) + return + + policies_updated = [ + {POLICY_NAME: policy.get(POLICY_NAME), + POLICY_VERSION: policy.get(POLICY_VER), + MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})} + for policy in message.get(LOADED_POLICIES, []) + ] + + policies_removed = [ + {POLICY_NAME: removed_policy.get(POLICY_NAME), + POLICY_VERSION: removed_policy.get(POLICY_VER)} + for removed_policy in message.get(REMOVED_POLICIES, []) + ] + + if not policies_updated and not policies_removed: + _LOGGER.info("no policy updated or removed") + return + + self._policy_updater.policy_update(policies_updated, policies_removed) + except Exception as ex: + error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), + "on_pdp_message", json.dumps(message)) + + _LOGGER.exception(error_msg) + + def _on_ws_error(self, error): + """report an error""" + _LOGGER.exception("policy-notification error %s", str(error)) + self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5 + + self._web_socket_health[PolicyListener.WS_STATUS] = "error" + self._web_socket_health[PolicyListener.WS_ERROR_COUNT] += 1 + self._web_socket_health["last_error"] = { + "error": str(error), "timestamp": str(datetime.utcnow()) + } + _LOGGER.info("web_socket_health %s", json.dumps(self._get_health(), sort_keys=True)) + + def _on_ws_close(self, code, reason): + """restart web-socket on close""" + self._web_socket_health["last_closed"] = str(datetime.utcnow()) + self._web_socket_health[PolicyListener.WS_STATUS] = "closed" + self._web_socket_health[PolicyListener.WS_CLOSE_COUNT] += 1 + _LOGGER.info( + "lost connection(%s, %s) to PDP web_socket_health %s", + code, reason, json.dumps(self._get_health(), sort_keys=True)) + + def _on_ws_open(self): + """started web-socket""" + self._web_socket_health[PolicyListener.WS_STATUS] = PolicyListener.WS_STARTED + self._web_socket_health[PolicyListener.WS_START_COUNT] += 1 + self._web_socket_health[PolicyListener.WS_STARTED] = datetime.utcnow() + _LOGGER.info("opened connection to PDP web_socket_health %s", + json.dumps(self._get_health(), sort_keys=True)) + + def _on_ws_pong(self, pong): + """pong = response to pinging the server of the web-socket""" + self._web_socket_health[PolicyListener.WS_PONG_COUNT] += 1 + _LOGGER.info( + "pong(%s) from connection to PDP web_socket_health %s", + pong, json.dumps(self._get_health(), sort_keys=True)) + + def _get_health(self): + """returns the healthcheck of the web-socket as json""" + web_socket_health = copy.deepcopy(self._web_socket_health) + web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs + started = web_socket_health.get(PolicyListener.WS_STARTED) + if started: + web_socket_health[PolicyListener.WS_STARTED] = str(started) + web_socket_health["uptime"] = str(datetime.utcnow() - started) + return web_socket_health + + + def shutdown(self, audit): + """Shutdown the policy-listener""" + _LOGGER.info(audit.info("shutdown policy-listener")) + with self._lock: + self._keep_running = False + + self._stop_notifications() + + if self.is_alive(): + self.join() diff --git a/policyhandler/policy_matcher.py b/policyhandler/pdp_api_v0/policy_matcher.py index d0786ba..357af49 100644 --- a/policyhandler/policy_matcher.py +++ b/policyhandler/pdp_api_v0/policy_matcher.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,27 +14,28 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """policy-matcher matches the policies from deployment-handler to policies from policy-engine""" import json -import logging +import os import re -from .deploy_handler import DeployHandler, PolicyUpdateMessage -from .onap.audit import AuditHttpCode, AuditResponseCode -from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, - MATCHING_CONDITIONS, POLICY_BODY, POLICY_FILTER, - POLICY_NAME, POLICY_VERSION, POLICY_VERSIONS) +from ..deploy_handler import DeployHandler, PolicyUpdateMessage +from ..onap.audit import AuditHttpCode, AuditResponseCode +from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, + POLICY_FILTER, POLICY_VERSIONS) +from ..utils import RegexCoarser, Utils +from .pdp_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_rest import PolicyRest -from .policy_utils import RegexCoarser +_LOGGER = Utils.get_logger(__file__) + class PolicyMatcher(object): """policy-matcher - static class""" - _logger = logging.getLogger("policy_handler.policy_matcher") PENDING_UPDATE = "pending_update" + PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) @staticmethod def get_deployed_policies(audit): @@ -43,12 +44,12 @@ class PolicyMatcher(object): if audit.is_not_found(): warning_txt = "got no deployed policies or policy-filters" - PolicyMatcher._logger.warning(warning_txt) + _LOGGER.warning(warning_txt) return {"warning": warning_txt}, None, None if not audit.is_success() or (not deployed_policies and not deployed_policy_filters): error_txt = "failed to retrieve policies from deployment-handler" - PolicyMatcher._logger.error(error_txt) + _LOGGER.error(error_txt) return {"error": error_txt}, None, None return None, deployed_policies, deployed_policy_filters @@ -62,7 +63,7 @@ class PolicyMatcher(object): if not (deployed_policies or deployed_policy_filters): error_txt = "no deployed policies or policy-filters" - PolicyMatcher._logger.warning(error_txt) + _LOGGER.warning(error_txt) return {"error": error_txt}, None coarse_regex_patterns = PolicyMatcher.calc_coarse_patterns( @@ -72,7 +73,7 @@ class PolicyMatcher(object): error_txt = ("failed to construct the coarse_regex_patterns from " + "deployed_policies: {} and deployed_policy_filters: {}" .format(deployed_policies, deployed_policy_filters)) - PolicyMatcher._logger.error(audit.error( + _LOGGER.error(audit.error( error_txt, error_code=AuditResponseCode.DATA_ERROR)) audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value) return {"error": error_txt}, None @@ -84,7 +85,7 @@ class PolicyMatcher(object): if not audit.is_success(): error_txt = "failed to retrieve policies from policy-engine" - PolicyMatcher._logger.warning(error_txt) + _LOGGER.warning(error_txt) return {"error": error_txt}, None latest_policies = pdp_response.get(LATEST_POLICIES, {}) @@ -123,7 +124,7 @@ class PolicyMatcher(object): coarse_regex.add_regex_pattern(policy_name_pattern) coarse_regex_patterns = coarse_regex.get_coarse_regex_patterns() - PolicyMatcher._logger.debug( + _LOGGER.debug( audit.debug("coarse_regex_patterns({}) combined_regex_pattern({}) for patterns({})" .format(coarse_regex_patterns, coarse_regex.get_combined_regex_pattern(), @@ -219,11 +220,11 @@ class PolicyMatcher(object): log_line = "policy {} to filter id {}: {}".format(json.dumps(policy), policy_filter_id, json.dumps(policy_filter)) - # PolicyMatcher._logger.debug(audit.debug("matching {}...".format(log_line))) + # _LOGGER.debug(audit.debug("matching {}...".format(log_line))) if (filter_policy_name != policy_id and filter_policy_name != policy_name and not re.match(filter_policy_name, policy_name)): - PolicyMatcher._logger.debug( + _LOGGER.debug( audit.debug("not match by policyName: {} != {}: {}" .format(policy_name, filter_policy_name, log_line))) return False @@ -235,7 +236,7 @@ class PolicyMatcher(object): filter_onap_name = policy_filter.get("onapName") policy_onap_name = matching_conditions.get("ONAPName") if filter_onap_name and filter_onap_name != policy_onap_name: - PolicyMatcher._logger.debug( + _LOGGER.debug( audit.debug("not match by ONAPName: {} != {}: {}" .format(policy_onap_name, filter_onap_name, log_line))) return False @@ -243,7 +244,7 @@ class PolicyMatcher(object): filter_config_name = policy_filter.get("configName") policy_config_name = matching_conditions.get("ConfigName") if filter_config_name and filter_config_name != policy_config_name: - PolicyMatcher._logger.debug( + _LOGGER.debug( audit.debug("not match by configName: {} != {}: {}" .format(policy_config_name, filter_config_name, log_line))) return False @@ -253,12 +254,12 @@ class PolicyMatcher(object): for filter_key, filter_config_attribute in filter_config_attributes.items(): if (filter_key not in matching_conditions or filter_config_attribute != matching_conditions.get(filter_key)): - PolicyMatcher._logger.debug( + _LOGGER.debug( audit.debug("not match by configAttributes: {} != {}: {}" .format(json.dumps(matching_conditions), json.dumps(filter_config_attributes), log_line))) return False - PolicyMatcher._logger.debug(audit.debug("matched {}".format(log_line))) + _LOGGER.debug(audit.debug("matched {}".format(log_line))) return True diff --git a/policyhandler/policy_rest.py b/policyhandler/pdp_api_v0/policy_rest.py index 85dd914..c59625e 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/pdp_api_v0/policy_rest.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,13 +14,12 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """policy-client communicates with policy-engine thru REST API""" import copy import json -import logging +import os import time import urllib.parse from multiprocessing.dummy import Pool as ThreadPool @@ -28,18 +27,21 @@ from threading import Lock import requests -from .config import Config, Settings -from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, - AuditResponseCode, Metrics) -from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, - POLICY_CONFIG, POLICY_FILTER, POLICY_FILTERS, - POLICY_ID, POLICY_NAME) +from ..config import Config, Settings +from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, + AuditResponseCode, Metrics) +from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, + POLICY_FILTER, POLICY_FILTERS, POLICY_ID, + POLICY_NAMES) +from ..utils import Utils +from .pdp_consts import POLICY_CONFIG, POLICY_NAME, POLICY_VERSION from .policy_utils import PolicyUtils +_LOGGER = Utils.get_logger(__file__) class PolicyRest(object): """using the http API to policy-engine""" - _logger = logging.getLogger("policy_handler.policy_rest") + PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) _lazy_inited = False POLICY_GET_CONFIG = 'getConfig' PDP_CONFIG_STATUS = "policyConfigStatus" @@ -60,6 +62,7 @@ class PolicyRest(object): Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP) _requests_session = None + _url = None _url_get_config = None _headers = None _target_entity = None @@ -73,8 +76,7 @@ class PolicyRest(object): def _init(): """init static config""" PolicyRest._custom_kwargs = {} - - _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) + tls_ca_mode = None if not PolicyRest._requests_session: PolicyRest._requests_session = requests.Session() @@ -88,28 +90,33 @@ class PolicyRest(object): 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)) - get_config_path = urllib.parse.urljoin( - config.get("path_api", "pdp/api").strip("/") + "/", PolicyRest.POLICY_GET_CONFIG) - PolicyRest._url_get_config = urllib.parse.urljoin(config.get("url", ""), get_config_path) - PolicyRest._headers = config.get("headers", {}) - PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) - _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( - Config.THREAD_POOL_SIZE, 4) - if PolicyRest._thread_pool_size < 2: - PolicyRest._thread_pool_size = 2 - - _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key( - Config.POLICY_RETRY_COUNT, 1) - _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key( - Config.POLICY_RETRY_SLEEP, 0) - - tls_ca_mode = config.get(Config.TLS_CA_MODE) - PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) - PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS) - if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1: - PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS - - PolicyRest._logger.info( + _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) + if config: + PolicyRest._url = config.get("url") + if PolicyRest._url: + path_get_config = urllib.parse.urljoin( + config.get("path_api", "pdp/api").strip("/") + + "/", PolicyRest.POLICY_GET_CONFIG) + PolicyRest._url_get_config = urllib.parse.urljoin(PolicyRest._url, path_get_config) + PolicyRest._headers = config.get("headers", {}) + PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) + _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( + Config.THREAD_POOL_SIZE, 4) + if PolicyRest._thread_pool_size < 2: + PolicyRest._thread_pool_size = 2 + + _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_COUNT, 1) + _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_SLEEP, 0) + + tls_ca_mode = config.get(Config.TLS_CA_MODE) + PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) + PolicyRest._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS) + if not PolicyRest._timeout_in_secs or PolicyRest._timeout_in_secs < 1: + PolicyRest._timeout_in_secs = PolicyRest.DEFAULT_TIMEOUT_IN_SECS + + _LOGGER.info( "PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) timeout_in_secs(%s) custom_kwargs(%s): %s", PolicyRest._target_entity, PolicyRest._url_get_config, Metrics.json_dumps(PolicyRest._headers), tls_ca_mode, @@ -148,6 +155,12 @@ class PolicyRest(object): @staticmethod def _pdp_get_config(audit, json_body): """Communication with the policy-engine""" + if not PolicyRest._url: + _LOGGER.error( + audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR)) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None + with PolicyRest._lock: session = PolicyRest._requests_session target_entity = PolicyRest._target_entity @@ -166,7 +179,7 @@ class PolicyRest(object): timeout_in_secs) log_line = log_action + " " + log_data - PolicyRest._logger.info(metrics.metrics_start(log_line)) + _LOGGER.info(metrics.metrics_start(log_line)) res = None try: @@ -178,7 +191,7 @@ class PolicyRest(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) - PolicyRest._logger.exception(error_msg) + _LOGGER.exception(error_msg) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) @@ -195,7 +208,7 @@ class PolicyRest(object): metrics.set_http_status_code(res.status_code) metrics.metrics(log_line) - PolicyRest._logger.info(log_line) + _LOGGER.info(log_line) return res.status_code, res_data @staticmethod @@ -213,7 +226,7 @@ class PolicyRest(object): error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value error_msg = "{} unexpected {}".format(error_code, log_line) - PolicyRest._logger.error(error_msg) + _LOGGER.error(error_msg) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) @@ -236,7 +249,7 @@ class PolicyRest(object): status_code = AuditHttpCode.DATA_NOT_FOUND_OK.value info_msg = "{} not found {}".format(status_code, log_line) - PolicyRest._logger.info(info_msg) + _LOGGER.info(info_msg) metrics.set_http_status_code(status_code) metrics.metrics(info_msg) return status_code, None @@ -273,7 +286,7 @@ class PolicyRest(object): .format(audit.request_id, type(ex).__name__, str(ex), "get_latest_policy", str_metrics)) - PolicyRest._logger.exception(error_msg) + _LOGGER.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) return None @@ -290,8 +303,7 @@ class PolicyRest(object): expect_policy_removed = (ignore_policy_names and not expected_versions) for retry in range(1, PolicyRest._policy_retry_count + 1): - PolicyRest._logger.debug("try(%s) retry_get_config(%s): %s", - retry, retry_get_config, str_metrics) + _LOGGER.debug("try(%s) retry_get_config(%s): %s", retry, retry_get_config, str_metrics) done, latest_policy, status_code = PolicyRest._get_latest_policy_once( audit, policy_id, expected_versions, ignore_policy_names, @@ -301,13 +313,13 @@ class PolicyRest(object): break if retry == PolicyRest._policy_retry_count: - PolicyRest._logger.error( + _LOGGER.error( audit.error("gave up retrying after #{} for policy_id({}) from PDP {}" .format(retry, policy_id, PolicyRest._url_get_config), error_code=AuditResponseCode.DATA_ERROR)) break - PolicyRest._logger.warning(audit.warn( + _LOGGER.warning(audit.warn( "will retry({}) for policy_id({}) in {} secs from PDP {}".format( retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_get_config), error_code=AuditResponseCode.DATA_ERROR)) @@ -321,7 +333,7 @@ class PolicyRest(object): audit.set_http_status_code(status_code) if not PolicyRest._validate_policy(latest_policy): audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) - PolicyRest._logger.error(audit.error( + _LOGGER.error(audit.error( "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), error_code=AuditResponseCode.DATA_ERROR)) @@ -335,15 +347,15 @@ class PolicyRest(object): status_code, policy_bodies = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id}) - PolicyRest._logger.debug("%s %s policy_bodies: %s", - status_code, policy_id, json.dumps(policy_bodies or [])) + _LOGGER.debug("%s %s policy_bodies: %s", + status_code, policy_id, json.dumps(policy_bodies or [])) latest_policy = PolicyUtils.select_latest_policy( policy_bodies, expected_versions, ignore_policy_names ) if not latest_policy and not expect_policy_removed: - PolicyRest._logger.error( + _LOGGER.error( audit.error("received unexpected policy data from PDP for policy_id={}: {}" .format(policy_id, json.dumps(policy_bodies or [])), error_code=AuditResponseCode.DATA_ERROR)) @@ -355,9 +367,16 @@ class PolicyRest(object): return done, latest_policy, status_code @staticmethod - def get_latest_updated_policies(aud_policy_updates): + def get_latest_updated_policies(audit, updated_policies, removed_policies): """safely try retrieving the latest policies for the list of policy_names""" - audit, policies_updated, policies_removed = aud_policy_updates + if not updated_policies and not removed_policies: + return None, None + + policies_updated = [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION)) + for policy_id, policy in updated_policies.items()] + policies_removed = [(policy_id, policy.get(POLICY_NAMES, {})) + for policy_id, policy in removed_policies.items()] + if not policies_updated and not policies_removed: return None, None @@ -374,7 +393,7 @@ class PolicyRest(object): .format(audit.request_id, type(ex).__name__, str(ex), "get_latest_updated_policies", str_metrics)) - PolicyRest._logger.exception(error_msg) + _LOGGER.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) return None, None @@ -389,7 +408,7 @@ class PolicyRest(object): targetServiceName=PolicyRest._url_get_config) metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) - PolicyRest._logger.debug(str_metrics) + _LOGGER.debug(str_metrics) policies_to_find = {} for (policy_id, policy_version) in policies_updated: @@ -424,8 +443,8 @@ class PolicyRest(object): policies = None apns_length = len(apns) - PolicyRest._logger.debug("apns_length(%s) policies_to_find %s", apns_length, - json.dumps(policies_to_find)) + _LOGGER.debug("apns_length(%s) policies_to_find %s", apns_length, + json.dumps(policies_to_find)) if apns_length == 1: policies = [PolicyRest.get_latest_policy(apns[0])] @@ -454,7 +473,7 @@ class PolicyRest(object): if policy_id not in updated_policies and policy_id not in removed_policies) - PolicyRest._logger.debug( + _LOGGER.debug( "result(%s) updated_policies %s, removed_policies %s, errored_policies %s", apns_length, json.dumps(updated_policies), json.dumps(removed_policies), json.dumps(errored_policies)) @@ -474,19 +493,19 @@ class PolicyRest(object): audit, policy_filter = aud_policy_filter try: str_policy_filter = json.dumps(policy_filter) - PolicyRest._logger.debug("%s", str_policy_filter) + _LOGGER.debug("%s", str_policy_filter) status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter) audit.set_http_status_code(status_code) - PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code, - str_policy_filter, json.dumps(policy_bodies or [])) + _LOGGER.debug("%s policy_bodies: %s %s", status_code, + str_policy_filter, json.dumps(policy_bodies or [])) latest_policies = PolicyUtils.select_latest_policies(policy_bodies) if not latest_policies: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) - PolicyRest._logger.warning(audit.warn( + _LOGGER.warning(audit.warn( "received no policies from PDP for policy_filter {}: {}" .format(str_policy_filter, json.dumps(policy_bodies or [])), error_code=AuditResponseCode.DATA_ERROR)) @@ -506,7 +525,7 @@ class PolicyRest(object): .format(audit.request_id, type(ex).__name__, str(ex), "_get_latest_policies", json.dumps(policy_filter))) - PolicyRest._logger.exception(error_msg) + _LOGGER.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) return None, None @@ -545,7 +564,7 @@ class PolicyRest(object): else: return result - PolicyRest._logger.debug("%s", str_policy_filters) + _LOGGER.debug("%s", str_policy_filters) metrics_total = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=PolicyRest._url_get_config) @@ -571,8 +590,8 @@ class PolicyRest(object): result[ERRORED_POLICIES] = dict( pair for (_, eps) in latest_policies if eps for pair in eps.items()) - PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s", - str_policy_filters, json.dumps(result)) + _LOGGER.debug("got policies for policy_filters: %s. result: %s", + str_policy_filters, json.dumps(result)) return result except Exception as ex: @@ -580,7 +599,7 @@ class PolicyRest(object): .format(audit.request_id, type(ex).__name__, str(ex), "get_latest_policies", str_metrics)) - PolicyRest._logger.exception(error_msg) + _LOGGER.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) return None diff --git a/policyhandler/pdp_api_v0/policy_updates.py b/policyhandler/pdp_api_v0/policy_updates.py new file mode 100644 index 0000000..eafdca2 --- /dev/null +++ b/policyhandler/pdp_api_v0/policy_updates.py @@ -0,0 +1,107 @@ +# ================================================================================ +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""policy-updates accumulates the policy-update notifications from PDP""" + +import json +import os + +from ..onap.audit import Audit +from ..policy_consts import POLICY_ID, POLICY_NAMES +from ..utils import Utils +from .pdp_consts import POLICY_NAME +from .policy_utils import PolicyUtils + + +_LOGGER = Utils.get_logger(__file__) + +class PolicyUpdates(object): + """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)""" + PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) + + def __init__(self): + """init and reset""" + self._audit = None + self._policies_updated = {} + self._policies_removed = {} + + def reset(self): + """resets the state""" + self.__init__() + + def pop_policy_updates(self): + """ + Returns the consolidated (audit, policies_updated, policies_removed) + and resets the state + """ + if not self._audit: + return None, None, None + + audit = self._audit + policies_updated = self._policies_updated + policies_removed = self._policies_removed + + self.reset() + + return audit, policies_updated, policies_removed + + + def push_policy_updates(self, policies_updated, policies_removed): + """consolidate the new policies_updated, policies_removed to existing ones""" + for policy_body in policies_updated: + policy_name = policy_body.get(POLICY_NAME) + policy = PolicyUtils.convert_to_policy(policy_body) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + + self._policies_updated[policy_id] = policy + + rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES) + if rm_policy_names and policy_name in rm_policy_names: + del rm_policy_names[policy_name] + + for policy_body in policies_removed: + policy_name = policy_body.get(POLICY_NAME) + policy = PolicyUtils.convert_to_policy(policy_body) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + + if policy_id in self._policies_removed: + policy = self._policies_removed[policy_id] + + if POLICY_NAMES not in policy: + policy[POLICY_NAMES] = {} + policy[POLICY_NAMES][policy_name] = True + self._policies_removed[policy_id] = policy + + req_message = ("policy-update notification - updated[{0}], removed[{1}]" + .format(len(self._policies_updated), + len(self._policies_removed))) + + if not self._audit: + self._audit = Audit(job_name="policy_update", + req_message=req_message, + retry_get_config=True) + else: + self._audit.req_message = req_message + + _LOGGER.info( + "pending(%s) for %s policies_updated %s policies_removed %s", + self._audit.request_id, req_message, + json.dumps(self._policies_updated), json.dumps(self._policies_removed)) diff --git a/policyhandler/pdp_api_v0/policy_utils.py b/policyhandler/pdp_api_v0/policy_utils.py new file mode 100644 index 0000000..d337665 --- /dev/null +++ b/policyhandler/pdp_api_v0/policy_utils.py @@ -0,0 +1,120 @@ +# ================================================================================ +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""utils for policy usage and conversions""" + +import re + +from ..policy_consts import POLICY_BODY, POLICY_ID +from ..utils import Utils +from .pdp_consts import POLICY_CONFIG, POLICY_NAME, POLICY_VERSION + + +class PolicyUtils(object): + """policy-client utils""" + _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') + + @staticmethod + def extract_policy_id(policy_name): + """ policy_name = policy_id + "." + <version> + "." + <extension> + For instance, + policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml + policy_id = DCAE_alex.Config_alex_policy_number_1 + policy_scope = DCAE_alex + policy_class = Config + policy_version = 3 + type = extension = xml + delimiter = "." + policy_class_delimiter = "_" + policy_name in PAP = DCAE_alex.alex_policy_number_1 + """ + if not policy_name: + return + return PolicyUtils._policy_name_ext.sub('', policy_name) + + @staticmethod + def parse_policy_config(policy): + """try parsing the config in policy.""" + if not policy: + return policy + config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG) + if config: + policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config) + return policy + + @staticmethod + def convert_to_policy(policy_body): + """wrap policy_body received from policy-engine with policy_id.""" + if not policy_body: + return None + policy_name = policy_body.get(POLICY_NAME) + policy_version = policy_body.get(POLICY_VERSION) + if not policy_name or not policy_version: + return None + policy_id = PolicyUtils.extract_policy_id(policy_name) + if not policy_id: + return None + return {POLICY_ID:policy_id, POLICY_BODY:policy_body} + + @staticmethod + def select_latest_policy(policy_bodies, expected_versions=None, ignore_policy_names=None): + """For some reason, the policy-engine returns all version of the policy_bodies. + DCAE-Controller is only interested in the latest version + """ + if not policy_bodies: + return + latest_policy_body = {} + for policy_body in policy_bodies: + policy_name = policy_body.get(POLICY_NAME) + policy_version = policy_body.get(POLICY_VERSION) + if not policy_name or not policy_version or not policy_version.isdigit(): + continue + if expected_versions and policy_version not in expected_versions: + continue + if ignore_policy_names and policy_name in ignore_policy_names: + continue + + if (not latest_policy_body + or int(latest_policy_body[POLICY_VERSION]) < int(policy_version)): + latest_policy_body = policy_body + + return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body)) + + @staticmethod + def select_latest_policies(policy_bodies): + """For some reason, the policy-engine returns all version of the policy_bodies. + DCAE-Controller is only interested in the latest versions + """ + if not policy_bodies: + return {} + policies = {} + for policy_body in policy_bodies: + policy = PolicyUtils.convert_to_policy(policy_body) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) + if not policy_id or not policy_version or not policy_version.isdigit(): + continue + if (policy_id not in policies + or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])): + policies[policy_id] = policy + + for policy_id in policies: + policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) + + return policies diff --git a/policyhandler/pdp_client.py b/policyhandler/pdp_client.py new file mode 100644 index 0000000..353b604 --- /dev/null +++ b/policyhandler/pdp_client.py @@ -0,0 +1,29 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""policy-client communicates with policy-engine thru REST API""" + +from .config import Config +from .utils import Utils + +if Config.is_pdp_api_default(): + from .pdp_api import * +else: + from .pdp_api_v0 import * + +_LOGGER = Utils.get_logger(__file__) +_LOGGER.info(get_pdp_api_info()) diff --git a/policyhandler/policy_consts.py b/policyhandler/policy_consts.py index 8f3ec76..2eafc53 100644 --- a/policyhandler/policy_consts.py +++ b/policyhandler/policy_consts.py @@ -14,15 +14,11 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """contants of policy-handler""" POLICY_ID = 'policy_id' -POLICY_VERSION = "policyVersion" -POLICY_NAME = "policyName" POLICY_BODY = 'policy_body' -POLICY_CONFIG = 'config' CATCH_UP = "catch_up" AUTO_CATCH_UP = "auto catch_up" @@ -34,7 +30,6 @@ POLICY_FILTER = "policy_filter" POLICY_FILTERS = "policy_filters" POLICIES = "policies" POLICY_VERSIONS = "policy_versions" -MATCHING_CONDITIONS = "matchingConditions" POLICY_NAMES = "policy_names" POLICY_FILTER_MATCHES = "policy_filter_matches" TARGET_ENTITY = "target_entity" diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 7cf1869..d949c4b 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """ policy-receiver communicates with policy-engine @@ -25,323 +24,37 @@ on receiving the policy-notifications, the policy-receiver passes the notifications to policy-updater """ -import copy -import json -import logging -import ssl -import time -import urllib.parse -from datetime import datetime -from threading import Lock, Thread - -import websocket - -from .config import Config, Settings -from .onap.audit import Audit -from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION -from .policy_updater import PolicyUpdater -from .policy_utils import Utils from .service_activator import ServiceActivator -LOADED_POLICIES = 'loadedPolicies' -REMOVED_POLICIES = 'removedPolicies' -POLICY_VER = 'versionNo' -POLICY_MATCHES = 'matches' - -class _PolicyReceiver(Thread): - """web-socket to PolicyEngine""" - _logger = logging.getLogger("policy_handler.policy_receiver") - WS_STARTED = "started" - WS_START_COUNT = "start_count" - WS_CLOSE_COUNT = "close_count" - WS_ERROR_COUNT = "error_count" - WS_PONG_COUNT = "pong_count" - WS_MESSAGE_COUNT = "message_count" - WS_MESSAGE_TIMESTAMP = "message_timestamp" - WS_STATUS = "status" - WS_PING_INTERVAL_DEFAULT = 30 - WEB_SOCKET_HEALTH = "web_socket_health" - - def __init__(self, audit, policy_updater): - """web-socket inside the thread to receive policy notifications from PolicyEngine""" - Thread.__init__(self, name="policy_receiver", daemon=True) - - self._policy_updater = policy_updater - self._lock = Lock() - self._keep_running = True - self._settings = Settings(Config.FIELD_POLICY_ENGINE) - - self._sleep_before_restarting = 5 - self._web_socket_url = None - self._web_socket_sslopt = None - self._tls_wss_ca_mode = None - self._web_socket = None - self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT - self._web_socket_health = { - _PolicyReceiver.WS_START_COUNT: 0, - _PolicyReceiver.WS_CLOSE_COUNT: 0, - _PolicyReceiver.WS_ERROR_COUNT: 0, - _PolicyReceiver.WS_PONG_COUNT: 0, - _PolicyReceiver.WS_MESSAGE_COUNT: 0, - _PolicyReceiver.WS_STATUS: "created" - } - - Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH, self._get_health) - self.reconfigure(audit) - - def reconfigure(self, audit): - """configure and reconfigure the web-socket""" - with self._lock: - _PolicyReceiver._logger.info(audit.info("web_socket_health {}".format( - json.dumps(self._get_health(), sort_keys=True)))) - self._sleep_before_restarting = 5 - self._settings.set_config(Config.discovered_config) - changed, config = self._settings.get_by_key(Config.FIELD_POLICY_ENGINE) - - if not changed: - self._settings.commit_change() - return False - - prev_web_socket_url = self._web_socket_url - prev_web_socket_sslopt = self._web_socket_sslopt - prev_ws_ping_interval_in_secs = self._ws_ping_interval_in_secs - - self._web_socket_sslopt = None - - resturl = urllib.parse.urljoin(config.get("url", "").lower().rstrip("/") + "/", - config.get("path_notifications", "/pdp/notifications")) - - self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE) - - self._ws_ping_interval_in_secs = config.get(Config.WS_PING_INTERVAL_IN_SECS) - if not self._ws_ping_interval_in_secs or self._ws_ping_interval_in_secs < 60: - self._ws_ping_interval_in_secs = _PolicyReceiver.WS_PING_INTERVAL_DEFAULT - - if resturl.startswith("https:"): - self._web_socket_url = resturl.replace("https:", "wss:") - - verify = Config.get_tls_verify(self._tls_wss_ca_mode) - if verify is False: - self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE} - elif verify is True: - pass - else: - self._web_socket_sslopt = {'ca_certs': verify} - - else: - self._web_socket_url = resturl.replace("http:", "ws:") - - log_changed = ( - "changed web_socket_url(%s) or tls_wss_ca_mode(%s)" - " or ws_ping_interval_in_secs(%s): %s" % - (self._web_socket_url, self._tls_wss_ca_mode, self._ws_ping_interval_in_secs, - self._settings)) - if (self._web_socket_url == prev_web_socket_url - and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt) - and prev_ws_ping_interval_in_secs == self._ws_ping_interval_in_secs): - _PolicyReceiver._logger.info(audit.info("not {}".format(log_changed))) - self._settings.commit_change() - return False - - _PolicyReceiver._logger.info(audit.info(log_changed)) - self._settings.commit_change() - - self._stop_notifications() - return True - - def run(self): - """listen on web-socket and pass the policy notifications to policy-updater""" - _PolicyReceiver._logger.info("starting policy_receiver...") - websocket.enableTrace(True) - restarting = False - while True: - if not self._get_keep_running(): - break - - self._stop_notifications() - - if restarting: - with self._lock: - sleep_before_restarting = self._sleep_before_restarting - _PolicyReceiver._logger.info( - "going to sleep for %s secs before restarting policy-notifications", - sleep_before_restarting) - - time.sleep(sleep_before_restarting) - if not self._get_keep_running(): - break - - with self._lock: - web_socket_url = self._web_socket_url - sslopt = copy.deepcopy(self._web_socket_sslopt) - tls_wss_ca_mode = self._tls_wss_ca_mode - ws_ping_interval_in_secs = self._ws_ping_interval_in_secs - - _PolicyReceiver._logger.info( - "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)" - " ws_ping_interval_in_secs(%s)", - web_socket_url, json.dumps(sslopt), tls_wss_ca_mode, ws_ping_interval_in_secs) - - self._web_socket = websocket.WebSocketApp( - web_socket_url, - on_open=self._on_ws_open, - on_message=self._on_pdp_message, - on_close=self._on_ws_close, - on_error=self._on_ws_error, - on_pong=self._on_ws_pong - ) - - _PolicyReceiver._logger.info("waiting for policy-notifications...") - self._web_socket.run_forever(sslopt=sslopt, ping_interval=ws_ping_interval_in_secs) - restarting = True - - Audit.register_item_health(_PolicyReceiver.WEB_SOCKET_HEALTH) - _PolicyReceiver._logger.info("exit policy-receiver") - - def _get_keep_running(self): - """thread-safe check whether to continue running""" - with self._lock: - keep_running = self._keep_running - return keep_running - - def _stop_notifications(self): - """close the web-socket == stops the notification service if running.""" - with self._lock: - if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected: - self._web_socket.close() - _PolicyReceiver._logger.info("stopped receiving notifications from PDP") - - def _on_pdp_message(self, *args): - """received the notification from PDP""" - self._web_socket_health[_PolicyReceiver.WS_MESSAGE_COUNT] += 1 - self._web_socket_health[_PolicyReceiver.WS_MESSAGE_TIMESTAMP] = str(datetime.utcnow()) - try: - message = args and args[-1] - _PolicyReceiver._logger.info("Received notification message: %s", message) - _PolicyReceiver._logger.info("web_socket_health %s", - json.dumps(self._get_health(), sort_keys=True)) - if not message: - return - message = json.loads(message) - - if not message or not isinstance(message, dict): - _PolicyReceiver._logger.warning("unexpected message from PDP: %s", - json.dumps(message)) - return - - policies_updated = [ - {POLICY_NAME: policy.get(POLICY_NAME), - POLICY_VERSION: policy.get(POLICY_VER), - MATCHING_CONDITIONS: policy.get(POLICY_MATCHES, {})} - for policy in message.get(LOADED_POLICIES, []) - ] - - policies_removed = [ - {POLICY_NAME: removed_policy.get(POLICY_NAME), - POLICY_VERSION: removed_policy.get(POLICY_VER)} - for removed_policy in message.get(REMOVED_POLICIES, []) - ] - - if not policies_updated and not policies_removed: - _PolicyReceiver._logger.info("no policy updated or removed") - return - - self._policy_updater.policy_update(policies_updated, policies_removed) - except Exception as ex: - error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), - "on_pdp_message", json.dumps(message)) - - _PolicyReceiver._logger.exception(error_msg) - - def _on_ws_error(self, error): - """report an error""" - _PolicyReceiver._logger.exception("policy-notification error %s", str(error)) - self._sleep_before_restarting = 60 if isinstance(error, ssl.SSLError) else 5 - - self._web_socket_health[_PolicyReceiver.WS_STATUS] = "error" - self._web_socket_health[_PolicyReceiver.WS_ERROR_COUNT] += 1 - self._web_socket_health["last_error"] = { - "error": str(error), "timestamp": str(datetime.utcnow()) - } - _PolicyReceiver._logger.info("web_socket_health %s", - json.dumps(self._get_health(), sort_keys=True)) - - def _on_ws_close(self, code, reason): - """restart web-socket on close""" - self._web_socket_health["last_closed"] = str(datetime.utcnow()) - self._web_socket_health[_PolicyReceiver.WS_STATUS] = "closed" - self._web_socket_health[_PolicyReceiver.WS_CLOSE_COUNT] += 1 - _PolicyReceiver._logger.info( - "lost connection(%s, %s) to PDP web_socket_health %s", - code, reason, json.dumps(self._get_health(), sort_keys=True)) - - def _on_ws_open(self): - """started web-socket""" - self._web_socket_health[_PolicyReceiver.WS_STATUS] = _PolicyReceiver.WS_STARTED - self._web_socket_health[_PolicyReceiver.WS_START_COUNT] += 1 - self._web_socket_health[_PolicyReceiver.WS_STARTED] = datetime.utcnow() - _PolicyReceiver._logger.info("opened connection to PDP web_socket_health %s", - json.dumps(self._get_health(), sort_keys=True)) - - def _on_ws_pong(self, pong): - """pong = response to pinging the server of the web-socket""" - self._web_socket_health[_PolicyReceiver.WS_PONG_COUNT] += 1 - _PolicyReceiver._logger.info( - "pong(%s) from connection to PDP web_socket_health %s", - pong, json.dumps(self._get_health(), sort_keys=True)) - - def _get_health(self): - """returns the healthcheck of the web-socket as json""" - web_socket_health = copy.deepcopy(self._web_socket_health) - web_socket_health[Config.WS_PING_INTERVAL_IN_SECS] = self._ws_ping_interval_in_secs - started = web_socket_health.get(_PolicyReceiver.WS_STARTED) - if started: - web_socket_health[_PolicyReceiver.WS_STARTED] = str(started) - web_socket_health["uptime"] = str(datetime.utcnow() - started) - return web_socket_health - - - def shutdown(self, audit): - """Shutdown the policy-receiver""" - _PolicyReceiver._logger.info(audit.info("shutdown policy-receiver")) - with self._lock: - self._keep_running = False - - self._stop_notifications() - - if self.is_alive(): - self.join() - - class PolicyReceiver(object): """ policy-receiver - static singleton wrapper around two threads policy_updater - master thread for all scheduled actions - policy_receiver - listens to policy-engine through web-socket + policy_listener - listens to policy-engine through web-socket """ _policy_updater = None - _policy_receiver = None + _policy_listener = None @staticmethod def is_running(): """check whether the policy-receiver runs""" - return (PolicyReceiver._policy_receiver - and PolicyReceiver._policy_receiver.is_alive() + return (PolicyReceiver._policy_listener + and PolicyReceiver._policy_listener.is_alive() and PolicyReceiver._policy_updater and PolicyReceiver._policy_updater.is_alive()) @staticmethod - def _close_receiver(audit): + def _close_listener(audit): """stop the notification-handler""" - if PolicyReceiver._policy_receiver: - policy_receiver = PolicyReceiver._policy_receiver - PolicyReceiver._policy_receiver = None + if PolicyReceiver._policy_listener: + policy_receiver = PolicyReceiver._policy_listener + PolicyReceiver._policy_listener = None policy_receiver.shutdown(audit) @staticmethod def shutdown(audit): """shutdown the notification-handler and policy-updater""" - PolicyReceiver._close_receiver(audit) + PolicyReceiver._close_listener(audit) PolicyReceiver._policy_updater.shutdown(audit) @staticmethod @@ -359,23 +72,26 @@ class PolicyReceiver(object): """act on reconfiguration event""" active = ServiceActivator.is_active_mode_of_operation(audit) - if not PolicyReceiver._policy_receiver: + if not PolicyReceiver._policy_listener: if active: - PolicyReceiver._policy_receiver = _PolicyReceiver(audit, - PolicyReceiver._policy_updater) - PolicyReceiver._policy_receiver.start() + from . import pdp_client + PolicyReceiver._policy_listener = pdp_client.PolicyListener( + audit, PolicyReceiver._policy_updater + ) + PolicyReceiver._policy_listener.start() return if not active: - PolicyReceiver._close_receiver(audit) + PolicyReceiver._close_listener(audit) return - PolicyReceiver._policy_receiver.reconfigure(audit) + PolicyReceiver._policy_listener.reconfigure(audit) @staticmethod def run(audit): """run policy_updater and policy_receiver""" + from .policy_updater import PolicyUpdater PolicyReceiver._policy_updater = PolicyUpdater(PolicyReceiver._on_reconfigure) PolicyReceiver._on_reconfigure(audit) diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index af1ea4b..3fcde40 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -14,109 +14,26 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """policy-updater thread""" import json -import logging from threading import Event, Lock, Thread +from . import pdp_client from .config import Config, Settings from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode, AuditResponseCode -from .policy_consts import (AUTO_CATCH_UP, AUTO_RECONFIGURE, CATCH_UP, - POLICY_BODY, POLICY_ID, POLICY_NAME, POLICY_NAMES, - POLICY_VERSION) -from .policy_matcher import PolicyMatcher -from .policy_rest import PolicyRest -from .policy_utils import PolicyUtils +from .policy_consts import AUTO_CATCH_UP, AUTO_RECONFIGURE, CATCH_UP from .service_activator import ServiceActivator from .step_timer import StepTimer +from .utils import Utils -class _PolicyUpdate(object): - """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)""" - _logger = logging.getLogger("policy_handler.policy_update") - - def __init__(self): - """init and reset""" - self._audit = None - self._policies_updated = {} - self._policies_removed = {} - - def reset(self): - """resets the state""" - self.__init__() - - def pop_policy_update(self): - """ - Returns the consolidated (audit, policies_updated, policies_removed) - and resets the state - """ - if not self._audit: - return None, None, None - - audit = self._audit - policies_updated = self._policies_updated - policies_removed = self._policies_removed - - self.reset() - - return audit, policies_updated, policies_removed - - - def push_policy_update(self, policies_updated, policies_removed): - """consolidate the new policies_updated, policies_removed to existing ones""" - for policy_body in policies_updated: - policy_name = policy_body.get(POLICY_NAME) - policy = PolicyUtils.convert_to_policy(policy_body) - if not policy: - continue - policy_id = policy.get(POLICY_ID) - - self._policies_updated[policy_id] = policy - - rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES) - if rm_policy_names and policy_name in rm_policy_names: - del rm_policy_names[policy_name] - - for policy_body in policies_removed: - policy_name = policy_body.get(POLICY_NAME) - policy = PolicyUtils.convert_to_policy(policy_body) - if not policy: - continue - policy_id = policy.get(POLICY_ID) - - if policy_id in self._policies_removed: - policy = self._policies_removed[policy_id] - - if POLICY_NAMES not in policy: - policy[POLICY_NAMES] = {} - policy[POLICY_NAMES][policy_name] = True - self._policies_removed[policy_id] = policy - - req_message = ("policy-update notification - updated[{0}], removed[{1}]" - .format(len(self._policies_updated), - len(self._policies_removed))) - - if not self._audit: - self._audit = Audit(job_name="policy_update", - req_message=req_message, - retry_get_config=True) - else: - self._audit.req_message = req_message - - self._logger.info( - "pending(%s) for %s policies_updated %s policies_removed %s", - self._audit.request_id, req_message, - json.dumps(self._policies_updated), json.dumps(self._policies_removed)) - +_LOGGER = Utils.get_logger(__file__) class PolicyUpdater(Thread): """sequentially handle the policy-updates and catch-ups in its own policy_updater thread""" - _logger = logging.getLogger("policy_handler.policy_updater") - def __init__(self, on_reconfigure_receiver): """init static config of PolicyUpdater.""" Thread.__init__(self, name="policy_updater", daemon=True) @@ -132,7 +49,7 @@ class PolicyUpdater(Thread): self._aud_shutdown = None self._aud_catch_up = None self._aud_reconfigure = None - self._policy_update = _PolicyUpdate() + self._policy_updates = pdp_client.PolicyUpdates() self._catch_up_interval = None self._reconfigure_interval = None @@ -151,7 +68,7 @@ class PolicyUpdater(Thread): _, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {}) self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60 - PolicyUpdater._logger.info( + _LOGGER.info( "intervals: catch_up(%s) reconfigure(%s): %s", self._catch_up_interval, self._reconfigure_interval, self._settings) self._settings.commit_change() @@ -160,7 +77,7 @@ class PolicyUpdater(Thread): def policy_update(self, policies_updated, policies_removed): """enqueue the policy-updates""" with self._lock: - self._policy_update.push_policy_update(policies_updated, policies_removed) + self._policy_updates.push_policy_updates(policies_updated, policies_removed) self._run.set() def catch_up(self, audit=None): @@ -168,7 +85,7 @@ class PolicyUpdater(Thread): with self._lock: if not self._aud_catch_up: self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) - PolicyUpdater._logger.info( + _LOGGER.info( "catch_up %s request_id %s", self._aud_catch_up.req_message, self._aud_catch_up.request_id ) @@ -179,7 +96,7 @@ class PolicyUpdater(Thread): with self._lock: if not self._aud_reconfigure: self._aud_reconfigure = audit or Audit(req_message=AUTO_RECONFIGURE) - PolicyUpdater._logger.info( + _LOGGER.info( "%s request_id %s", self._aud_reconfigure.req_message, self._aud_reconfigure.request_id ) @@ -187,10 +104,10 @@ class PolicyUpdater(Thread): def run(self): """wait and run the policy-update in thread""" - PolicyUpdater._logger.info("starting policy_updater...") + _LOGGER.info("starting policy_updater...") self._run_reconfigure_timer() while True: - PolicyUpdater._logger.info("waiting for policy-updates...") + _LOGGER.info("waiting for policy-updates...") self._run.wait() with self._lock: @@ -209,7 +126,7 @@ class PolicyUpdater(Thread): self._on_policy_update() - PolicyUpdater._logger.info("exit policy-updater") + _LOGGER.info("exit policy-updater") def _keep_running(self): """thread-safe check whether to continue running""" @@ -226,18 +143,15 @@ class PolicyUpdater(Thread): return if self._catch_up_timer: - self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) + _LOGGER.info("next step catch_up_timer in %s", self._catch_up_interval) self._catch_up_timer.next(self._catch_up_interval) return self._catch_up_timer = StepTimer( - "catch_up_timer", - self._catch_up_interval, - PolicyUpdater.catch_up, - PolicyUpdater._logger, - self + "catch_up_timer", self._catch_up_interval, + PolicyUpdater.catch_up, self ) - self._logger.info("started catch_up_timer in %s", self._catch_up_interval) + _LOGGER.info("started catch_up_timer in %s", self._catch_up_interval) self._catch_up_timer.start() def _run_reconfigure_timer(self): @@ -246,41 +160,38 @@ class PolicyUpdater(Thread): return if self._reconfigure_timer: - self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval) + _LOGGER.info("next step reconfigure_timer in %s", self._reconfigure_interval) self._reconfigure_timer.next(self._reconfigure_interval) return self._reconfigure_timer = StepTimer( - "reconfigure_timer", - self._reconfigure_interval, - PolicyUpdater.reconfigure, - PolicyUpdater._logger, - self + "reconfigure_timer", self._reconfigure_interval, + PolicyUpdater.reconfigure, self ) - self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval) + _LOGGER.info("started reconfigure_timer in %s", self._reconfigure_interval) self._reconfigure_timer.start() def _pause_catch_up_timer(self): """pause catch_up_timer""" if self._catch_up_timer: - self._logger.info("pause catch_up_timer") + _LOGGER.info("pause catch_up_timer") self._catch_up_timer.pause() def _stop_timers(self): """stop and destroy the catch_up and reconfigure timers""" if self._catch_up_timer: - self._logger.info("stopping catch_up_timer") + _LOGGER.info("stopping catch_up_timer") self._catch_up_timer.stop() self._catch_up_timer.join() self._catch_up_timer = None - self._logger.info("stopped catch_up_timer") + _LOGGER.info("stopped catch_up_timer") if self._reconfigure_timer: - self._logger.info("stopping reconfigure_timer") + _LOGGER.info("stopping reconfigure_timer") self._reconfigure_timer.stop() self._reconfigure_timer.join() self._reconfigure_timer = None - self._logger.info("stopped reconfigure_timer") + _LOGGER.info("stopped reconfigure_timer") def _on_reconfigure(self): """bring the latest config and reconfigure""" @@ -296,7 +207,7 @@ class PolicyUpdater(Thread): reconfigure_result = "" try: need_to_catch_up = False - PolicyUpdater._logger.info(log_line) + _LOGGER.info(log_line) active_prev = ServiceActivator.is_active_mode_of_operation(aud_reconfigure) Config.discover(aud_reconfigure) @@ -314,7 +225,7 @@ class PolicyUpdater(Thread): if self._set_timer_intervals(): changed_configs.append("timer_intervals") - if PolicyRest.reconfigure(): + if pdp_client.PolicyRest.reconfigure(): need_to_catch_up = True changed_configs.append(Config.FIELD_POLICY_ENGINE) @@ -335,7 +246,7 @@ class PolicyUpdater(Thread): Config.discovered_config.commit_change() aud_reconfigure.audit_done(result=reconfigure_result) - PolicyUpdater._logger.info(log_line + reconfigure_result) + _LOGGER.info(log_line + reconfigure_result) if need_to_catch_up: self._pause_catch_up_timer() @@ -345,16 +256,15 @@ class PolicyUpdater(Thread): error_msg = "crash {} {}{}: {}: {}".format( "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex)) - PolicyUpdater._logger.exception(error_msg) + _LOGGER.exception(error_msg) aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) aud_reconfigure.audit_done(result=error_msg) self._run_reconfigure_timer() - PolicyUpdater._logger.info("policy_handler health: %s", - json.dumps(aud_reconfigure.health(full=True))) - PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info())) + _LOGGER.info("policy_handler health: %s", json.dumps(aud_reconfigure.health(full=True))) + _LOGGER.info("process_info: %s", json.dumps(aud_reconfigure.process_info())) def _on_catch_up(self): @@ -363,7 +273,7 @@ class PolicyUpdater(Thread): aud_catch_up = self._aud_catch_up if self._aud_catch_up: self._aud_catch_up = None - self._policy_update.reset() + self._policy_updates.reset() if not aud_catch_up: return False @@ -374,12 +284,11 @@ class PolicyUpdater(Thread): ) self._pause_catch_up_timer() aud_catch_up.audit_done(result=catch_up_result) - PolicyUpdater._logger.info(catch_up_result) + _LOGGER.info(catch_up_result) self._run_catch_up_timer() - PolicyUpdater._logger.info("policy_handler health: %s", - json.dumps(aud_catch_up.health(full=True))) - PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info())) + _LOGGER.info("policy_handler health: %s", json.dumps(aud_catch_up.health(full=True))) + _LOGGER.info("process_info: %s", json.dumps(aud_catch_up.process_info())) return False log_line = "catch_up {0} request_id {1}".format( @@ -388,25 +297,26 @@ class PolicyUpdater(Thread): catch_up_result = "" try: not_found_ok = None - PolicyUpdater._logger.info(log_line) + _LOGGER.info(log_line) self._pause_catch_up_timer() - _, policies, policy_filters = PolicyMatcher.get_deployed_policies(aud_catch_up) + (_, policies, + policy_filters) = pdp_client.PolicyMatcher.get_deployed_policies(aud_catch_up) catch_up_message = None if aud_catch_up.is_not_found(): not_found_ok = True else: - _, catch_up_message = PolicyMatcher.build_catch_up_message( + _, catch_up_message = pdp_client.PolicyMatcher.build_catch_up_message( aud_catch_up, policies, policy_filters) if not_found_ok: catch_up_result = ("- not sending catch-up " "- no deployed policies or policy-filters") - PolicyUpdater._logger.warning(catch_up_result) + _LOGGER.warning(catch_up_result) elif not (catch_up_message and aud_catch_up.is_success()): catch_up_result = "- not sending catch-up to deployment-handler due to errors" - PolicyUpdater._logger.warning(catch_up_result) + _LOGGER.warning(catch_up_result) elif catch_up_message.empty(): catch_up_result = "- not sending empty catch-up to deployment-handler" else: @@ -414,18 +324,18 @@ class PolicyUpdater(Thread): DeployHandler.policy_update(aud_catch_up, catch_up_message) if not aud_catch_up.is_success(): catch_up_result = "- failed to send catch-up to deployment-handler" - PolicyUpdater._logger.warning(catch_up_result) + _LOGGER.warning(catch_up_result) else: catch_up_result = "- sent catch-up to deployment-handler" success, _, _ = aud_catch_up.audit_done(result=catch_up_result) - PolicyUpdater._logger.info(log_line + " " + catch_up_result) + _LOGGER.info(log_line + " " + catch_up_result) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" .format(aud_catch_up.request_id, type(ex).__name__, str(ex), "on_catch_up", log_line + " " + catch_up_result)) - PolicyUpdater._logger.exception(error_msg) + _LOGGER.exception(error_msg) aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) aud_catch_up.audit_done(result=error_msg) @@ -433,9 +343,8 @@ class PolicyUpdater(Thread): self._run_catch_up_timer() - PolicyUpdater._logger.info("policy_handler health: %s", - json.dumps(aud_catch_up.health(full=True))) - PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info())) + _LOGGER.info("policy_handler health: %s", json.dumps(aud_catch_up.health(full=True))) + _LOGGER.info("process_info: %s", json.dumps(aud_catch_up.process_info())) return success @@ -443,42 +352,39 @@ class PolicyUpdater(Thread): """handle the event of policy-updates""" result = "" with self._lock: - audit, policies_updated, policies_removed = self._policy_update.pop_policy_update() + audit, policies_updated, policies_removed = self._policy_updates.pop_policy_updates() if not audit: return log_line = "request_id: {} policies_updated: {} policies_removed: {}".format( audit.request_id, json.dumps(policies_updated), json.dumps(policies_removed)) - PolicyUpdater._logger.info(log_line) + _LOGGER.info(log_line) try: not_found_ok = None (updated_policies, removed_policies, - policy_filter_matches) = PolicyMatcher.match_to_deployed_policies( + policy_filter_matches) = pdp_client.PolicyMatcher.match_to_deployed_policies( audit, policies_updated, policies_removed) if audit.is_not_found(): not_found_ok = True elif updated_policies or removed_policies: - updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( - (audit, - [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION)) - for policy_id, policy in updated_policies.items()], - [(policy_id, policy.get(POLICY_NAMES, {})) - for policy_id, policy in removed_policies.items()] - )) + (updated_policies, + removed_policies) = pdp_client.PolicyRest.get_latest_updated_policies( + audit, updated_policies, removed_policies + ) if not_found_ok: result = ("- not sending policy-updates to deployment-handler " "- no deployed policies or policy-filters") - PolicyUpdater._logger.warning(result) + _LOGGER.warning(result) elif not audit.is_success(): result = "- not sending policy-updates to deployment-handler due to errors" - PolicyUpdater._logger.warning(result) + _LOGGER.warning(result) elif not updated_policies and not removed_policies: result = "- not sending empty policy-updates to deployment-handler" - PolicyUpdater._logger.info(result) + _LOGGER.info(result) else: message = PolicyUpdateMessage(updated_policies, removed_policies, policy_filter_matches, False) @@ -493,19 +399,19 @@ class PolicyUpdater(Thread): log_line = "request_id[{}]: {}".format(audit.request_id, str(message)) if not audit.is_success(): result = "- failed to send to deployment-handler {}".format(log_updates) - PolicyUpdater._logger.warning(result) + _LOGGER.warning(result) else: result = "- sent to deployment-handler {}".format(log_updates) audit.audit_done(result=result) - PolicyUpdater._logger.info(log_line + " " + result) + _LOGGER.info(log_line + " " + result) except Exception as ex: error_msg = ("{0}: crash {1} {2} at {3}: {4}" .format(audit.request_id, type(ex).__name__, str(ex), "on_policies_update", log_line + " " + result)) - PolicyUpdater._logger.exception(error_msg) + _LOGGER.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) @@ -517,7 +423,7 @@ class PolicyUpdater(Thread): def shutdown(self, audit): """Shutdown the policy-updater""" - PolicyUpdater._logger.info("shutdown policy-updater") + _LOGGER.info("shutdown policy-updater") with self._lock: self._aud_shutdown = audit self._run.set() diff --git a/policyhandler/service_activator.py b/policyhandler/service_activator.py index d51d11c..9c8a1b2 100644 --- a/policyhandler/service_activator.py +++ b/policyhandler/service_activator.py @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """ ask service_activator for the mode_of_operation @@ -30,23 +29,25 @@ """ import json -import logging from copy import deepcopy from urllib.parse import urljoin import requests from .config import Config, Settings -from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics +from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, + Metrics) from .policy_consts import TARGET_ENTITY +from .utils import Utils +_LOGGER = Utils.get_logger(__file__) class ServiceActivator(object): """calling the service_activator web api to determine the mode_of_operation""" - _logger = logging.getLogger("policy_handler.service_activator") DEFAULT_TARGET_ENTITY = "service_activator" DEFAULT_TIMEOUT_IN_SECS = 10 MODE_OF_OPERATION_ACTIVE = "active" + SERVICE_MODE = "service_mode" _lazy_inited = False _settings = Settings(Config.MODE_OF_OPERATION, Config.SERVICE_ACTIVATOR) @@ -81,6 +82,7 @@ class ServiceActivator(object): """ ServiceActivator._custom_kwargs = {} ServiceActivator._url = ServiceActivator._url_register = "" + Audit.register_item_health(ServiceActivator.SERVICE_MODE, ServiceActivator._get_service_mode) try: _, ServiceActivator._mode_of_operation = ServiceActivator._settings.get_by_key( @@ -98,7 +100,7 @@ class ServiceActivator(object): tls_ca_mode = config_sa.get(Config.TLS_CA_MODE) ServiceActivator._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) - ServiceActivator._logger.info(audit.info( + _LOGGER.info(audit.info( "dns based routing to %s: url(%s) tls_ca_mode(%s) custom_kwargs(%s)", ServiceActivator._target_entity, ServiceActivator._url_register, tls_ca_mode, json.dumps(ServiceActivator._custom_kwargs))) @@ -134,17 +136,31 @@ class ServiceActivator(object): ServiceActivator._init(audit) @staticmethod - def is_active_mode_of_operation(audit): + def _get_service_mode(): + """returns the service_mode as json to be reported by the healthcheck""" + return { + "is_active_mode_of_operation": ServiceActivator.is_active_mode_of_operation(), + "is_pdp_api_default": Config.is_pdp_api_default(log_status=False) + } + + @staticmethod + def is_active_mode_of_operation(audit=None): """ mode_of_operation - whether the service is active == True or passive == False based on the current value of the mode_of_operation + + temporary for R4 Dublin - passive for new PDP API """ active = (ServiceActivator._mode_of_operation is None or ServiceActivator._mode_of_operation == ServiceActivator.MODE_OF_OPERATION_ACTIVE) - ServiceActivator._logger.info(audit.info( - "mode_of_operation = {} active = {}".format( + + if active and Config.is_pdp_api_default(): + active = False + + if audit: + _LOGGER.info(audit.info("mode_of_operation = {} active = {}".format( ServiceActivator._mode_of_operation, active))) return active @@ -157,8 +173,7 @@ class ServiceActivator(object): target_entity = ServiceActivator._target_entity if not ServiceActivator._url: - ServiceActivator._logger.info(audit.info( - "no url found for {}".format(target_entity))) + _LOGGER.info(audit.info("no url found for {}".format(target_entity))) return ServiceActivator.is_active_mode_of_operation(audit) url = ServiceActivator._url_register @@ -177,7 +192,7 @@ class ServiceActivator(object): json.dumps(custom_kwargs)) log_line = log_action + " " + log_data - ServiceActivator._logger.info(log_line) + _LOGGER.info(log_line) metrics.metrics_start(log_line) res = None @@ -190,7 +205,7 @@ class ServiceActivator(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = "failed to {} {}: {} {}".format( log_action, type(ex).__name__, str(ex), log_data) - ServiceActivator._logger.exception(error_msg) + _LOGGER.exception(error_msg) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) @@ -204,7 +219,7 @@ class ServiceActivator(object): metrics.metrics(log_line) if res.status_code != requests.codes.ok: - ServiceActivator._logger.error(log_line) + _LOGGER.error(log_line) return ServiceActivator.is_active_mode_of_operation(audit) result = res.json() or {} diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py index 0f4f8e4..5ed4af5 100644 --- a/policyhandler/step_timer.py +++ b/policyhandler/step_timer.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """periodically callback""" @@ -22,6 +21,9 @@ import json from datetime import datetime from threading import Event, RLock, Thread +from .utils import Utils + +_LOGGER = Utils.get_logger(__file__) class StepTimer(Thread): """call on_time after interval number of seconds, then wait to continue""" @@ -32,12 +34,11 @@ class StepTimer(Thread): STATE_STOPPING = "stopping" STATE_STOPPED = "stopped" - def __init__(self, name, interval, on_time, logger, *args, **kwargs): + def __init__(self, name, interval, on_time, *args, **kwargs): """create step timer with controlled start. next step and pause""" Thread.__init__(self, name=name) self._interval = interval self._on_time = on_time - self._logger = logger self._args = args self._kwargs = kwargs @@ -110,8 +111,8 @@ class StepTimer(Thread): utcnow = datetime.utcnow() self._req_time = (utcnow - self._req_ts).total_seconds() self._req_ts = utcnow - self._logger.info("{0}[{1}] {2}->{3}".format( - self.name, self._req_time, prev_req, self.get_timer_status())) + _LOGGER.info("{}[{}] {}->{}".format(self.name, self._req_time, prev_req, + self.get_timer_status())) def _log_substep(self, substep): """log timer substep""" @@ -120,7 +121,8 @@ class StepTimer(Thread): utcnow = datetime.utcnow() self._substep_time = (utcnow - self._substep_ts).total_seconds() self._substep_ts = utcnow - self._logger.info("[{0}] {1}".format(self._substep_time, self.get_timer_status())) + _LOGGER.info("{}[{}] {}".format(self.name, self._substep_time, + self.get_timer_status())) def _on_time_event(self): """execute the _on_time event""" @@ -135,7 +137,7 @@ class StepTimer(Thread): error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})" .format(self.name, type(ex).__name__, str(ex), "_on_time", json.dumps(self._args), json.dumps(self._kwargs))) - self._logger.exception(error_msg) + _LOGGER.exception(error_msg) def run(self): """loop one step a time until stopped=finished""" diff --git a/policyhandler/policy_utils.py b/policyhandler/utils.py index 08d26f0..d728e48 100644 --- a/policyhandler/policy_utils.py +++ b/policyhandler/utils.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,121 +14,44 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -"""utils for policy usage and conversions""" +"""utils and conversions""" import json import logging -import re +import os from copy import deepcopy from typing import Pattern -from .policy_consts import (POLICY_BODY, POLICY_CONFIG, POLICY_ID, POLICY_NAME, - POLICY_VERSION) +class ToBeImplementedException(Exception): + """exception for to be implemented features of policy-handler""" + pass -class PolicyUtils(object): - """policy-client utils""" - _logger = logging.getLogger("policy_handler.policy_utils") - _policy_name_ext = re.compile('[.][0-9]+[.][a-zA-Z]+$') - - @staticmethod - def extract_policy_id(policy_name): - """ policy_name = policy_id + "." + <version> + "." + <extension> - For instance, - policy_name = DCAE_alex.Config_alex_policy_number_1.3.xml - policy_id = DCAE_alex.Config_alex_policy_number_1 - policy_scope = DCAE_alex - policy_class = Config - policy_version = 3 - type = extension = xml - delimiter = "." - policy_class_delimiter = "_" - policy_name in PAP = DCAE_alex.alex_policy_number_1 - """ - if not policy_name: - return - return PolicyUtils._policy_name_ext.sub('', policy_name) - - @staticmethod - def parse_policy_config(policy): - """try parsing the config in policy.""" - if not policy: - return policy - config = policy.get(POLICY_BODY, {}).get(POLICY_CONFIG) - if config: - policy[POLICY_BODY][POLICY_CONFIG] = Utils.safe_json_parse(config) - return policy - - @staticmethod - def convert_to_policy(policy_body): - """wrap policy_body received from policy-engine with policy_id.""" - if not policy_body: - return None - policy_name = policy_body.get(POLICY_NAME) - policy_version = policy_body.get(POLICY_VERSION) - if not policy_name or not policy_version: - return None - policy_id = PolicyUtils.extract_policy_id(policy_name) - if not policy_id: - return None - return {POLICY_ID:policy_id, POLICY_BODY:policy_body} - - @staticmethod - def select_latest_policy(policy_bodies, expected_versions=None, ignore_policy_names=None): - """For some reason, the policy-engine returns all version of the policy_bodies. - DCAE-Controller is only interested in the latest version - """ - if not policy_bodies: - return - latest_policy_body = {} - for policy_body in policy_bodies: - policy_name = policy_body.get(POLICY_NAME) - policy_version = policy_body.get(POLICY_VERSION) - if not policy_name or not policy_version or not policy_version.isdigit(): - continue - if expected_versions and policy_version not in expected_versions: - continue - if ignore_policy_names and policy_name in ignore_policy_names: - continue - - if (not latest_policy_body - or int(latest_policy_body[POLICY_VERSION]) < int(policy_version)): - latest_policy_body = policy_body - - return PolicyUtils.parse_policy_config(PolicyUtils.convert_to_policy(latest_policy_body)) - - @staticmethod - def select_latest_policies(policy_bodies): - """For some reason, the policy-engine returns all version of the policy_bodies. - DCAE-Controller is only interested in the latest versions - """ - if not policy_bodies: - return {} - policies = {} - for policy_body in policy_bodies: - policy = PolicyUtils.convert_to_policy(policy_body) - if not policy: - continue - policy_id = policy.get(POLICY_ID) - policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) - if not policy_id or not policy_version or not policy_version.isdigit(): - continue - if (policy_id not in policies - or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])): - policies[policy_id] = policy - - for policy_id in policies: - policies[policy_id] = PolicyUtils.parse_policy_config(policies[policy_id]) - - return policies - class Utils(object): """general purpose utils""" _logger = logging.getLogger("policy_handler.utils") @staticmethod + def get_logger(file_path): + """get the logger for the file_path == __file__""" + logger_path = [] + file_path = os.path.realpath(file_path) + logger_path.append(os.path.basename(file_path)[:-3]) + while file_path: + file_path = os.path.dirname(file_path) + folder_name = os.path.basename(file_path) + if folder_name == "policyhandler" or len(logger_path) > 5: + break + if folder_name == "tests": + logger_path.append("unit_test") + break + logger_path.append(folder_name) + + logger_path.append("policy_handler") + return logging.getLogger(".".join(reversed(logger_path))) + + @staticmethod def safe_json_parse(json_str): """try parsing json without exception - returns the json_str back if fails""" if not json_str: diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index dc76353..dfd1b51 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -14,29 +14,27 @@ # limitations under the License. # ============LICENSE_END========================================================= # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. """web-server for policy_handler""" import json -import logging from datetime import datetime import cherrypy +from . import pdp_client from .config import Config from .deploy_handler import PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode -from .policy_matcher import PolicyMatcher from .policy_receiver import PolicyReceiver -from .policy_rest import PolicyRest +from .utils import Utils class PolicyWeb(object): """run http API of policy-handler on 0.0.0.0:wservice_port - any incoming address""" DATA_NOT_FOUND_ERROR = 404 HOST_INADDR_ANY = ".".join("0"*4) - logger = logging.getLogger("policy_handler.policy_web") + logger = Utils.get_logger(__file__) @staticmethod def run_forever(audit): @@ -84,7 +82,8 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s policy_id=%s headers=%s", req_info, policy_id, json.dumps(cherrypy.request.headers)) - latest_policy = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {} + latest_policy = pdp_client.PolicyRest.get_latest_policy( + (audit, policy_id, None, None)) or {} PolicyWeb.logger.info("res %s policy_id=%s latest_policy=%s", req_info, policy_id, json.dumps(latest_policy)) @@ -104,9 +103,9 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s", req_info) - result, policies, policy_filters = PolicyMatcher.get_deployed_policies(audit) + result, policies, policy_filters = pdp_client.PolicyMatcher.get_deployed_policies(audit) if not result: - result, policy_update = PolicyMatcher.build_catch_up_message( + result, policy_update = pdp_client.PolicyMatcher.build_catch_up_message( audit, policies, policy_filters) if policy_update and isinstance(policy_update, PolicyUpdateMessage): result["policy_update"] = policy_update.get_message() @@ -168,6 +167,9 @@ class _PolicyWeb(object): } } """ + if Config.is_pdp_api_default(): + raise cherrypy.HTTPError(404, "temporarily unsupported due to the new pdp API") + if cherrypy.request.method == "GET": return self._get_all_policies_latest() @@ -184,7 +186,7 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s: policy_filter=%s headers=%s", req_info, str_policy_filter, json.dumps(cherrypy.request.headers)) - result = PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} + result = pdp_client.PolicyRest.get_latest_policies(audit, policy_filter=policy_filter) or {} result_str = json.dumps(result, sort_keys=True) PolicyWeb.logger.info("result %s: policy_filter=%s result=%s", |