From 78ff88f9b3a3d32f941b3b9fedc2abfbaba291cb Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Thu, 27 Feb 2020 12:45:54 -0500 Subject: 5.1.0 policy-handler - policy-updates from new PDP DCAEGEN2-1851: - policy-handler now supports the policy-update notification from the new policy-engine thru DMaaP MR = no policy-filters - only policy-id values - see README for discoverable config settings of dmaap_mr client = DMaaP MR client has the same flexibility as policy_engine = set the query.timeout to high value like 15000 (default) - requests to DMaaP MR go through a single blocking connection - first catch-up only after draining the policy-updates from DMaaP MR on the first loop - safe parsing of messages from DMaaP MR - policy-engine changed the data type for policy-version field from int to string that is expected to have the semver value - related change to deployment-handler (DCAEGEN2-2085) has to be deployed to handle the non-numeric policyVersion - on new PDP API: http /policy_latest and policy-updates return the new data from the new PDP API with the following fields added/renamed by the policy-handler to keep other policy related parts intact in R4-R6 (see pdp_api/policy_utils.py) * policyName = policy_id + "." + policyVersion.replace(".","-") + ".xml" * policyVersion = str(metadata["policy-version"]) * "config" - is the renamed "properties" from the new PDP API response - enabled the /catch_up and the periodic auto-catch-up for the new PDP API - enabled GET /policies_latest - returns the latest policies for the deployed components - POST /policies_latest - still disabled since no support for the policy-filters is provided for the new PDP API - fixed hiding the Authorization value on comparing the configs - logging of secrets is now sha256 to see whether they changed - added X-ONAP-RequestID to headers the same way as X-ECOMP-RequestID - on policy-update process the removal first, then addition - changed the pool_connections=1 (number of pools) on PDP and DH sides == only a single destination is expected for each - log the exception as fatal into error.log - other minor fixes and refactoring - unit-test coverage 74% - integration testing is requested DCAEGEN2-1976: - policy-handler is enhanced to get user/password from env vars for PDP and DMaaP MR clients and overwriting the Authorization field in https headers received from the discoverable config = to override the Authorization value on policy_engine, set the environment vars $PDP_USER and $PDP_PWD in policy-handler container = to override the Authorization value on dmaap_mr, if using https and user-password authentication, set the environment vars $DMAAP_MR_USER and $DMAAP_MR_PWD in policy-handler container Change-Id: Iad8eab9e20e615a0e0d2822f4735dc64c50aa55c Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-1851 Issue-ID: DCAEGEN2-1976 --- policyhandler/config.py | 56 ++++- policyhandler/deploy_handler.py | 28 +-- policyhandler/discovery.py | 6 +- policyhandler/onap/audit.py | 51 +++-- policyhandler/pdp_api/dmaap_mr.py | 202 +++++++++++++++++ policyhandler/pdp_api/pdp_consts.py | 6 +- policyhandler/pdp_api/policy_listener.py | 176 +++++++++++++-- policyhandler/pdp_api/policy_matcher.py | 93 +++++++- policyhandler/pdp_api/policy_rest.py | 331 +++++++++++++++++++++++++--- policyhandler/pdp_api/policy_updates.py | 87 +++++++- policyhandler/pdp_api/policy_utils.py | 32 +-- policyhandler/pdp_api_v0/policy_listener.py | 8 +- policyhandler/pdp_api_v0/policy_matcher.py | 22 +- policyhandler/pdp_api_v0/policy_rest.py | 17 +- policyhandler/pdp_api_v0/policy_updates.py | 38 ++-- policyhandler/pdp_api_v0/policy_utils.py | 8 +- policyhandler/policy_receiver.py | 15 +- policyhandler/policy_updater.py | 33 ++- policyhandler/service_activator.py | 14 +- policyhandler/utils.py | 15 +- policyhandler/web_server.py | 14 +- 21 files changed, 1029 insertions(+), 223 deletions(-) create mode 100644 policyhandler/pdp_api/dmaap_mr.py (limited to 'policyhandler') diff --git a/policyhandler/config.py b/policyhandler/config.py index f8c425a..25ae3a5 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ """read and use the config""" +import base64 import copy import json import logging @@ -137,6 +138,7 @@ class Config(object): FIELD_WSERVICE_PORT = "wservice_port" FIELD_TLS = "tls" FIELD_POLICY_ENGINE = "policy_engine" + DMAAP_MR = "dmaap_mr" POOL_CONNECTIONS = "pool_connections" DEPLOY_HANDLER = "deploy_handler" THREAD_POOL_SIZE = "thread_pool_size" @@ -155,6 +157,11 @@ class Config(object): SERVICE_ACTIVATOR = "service_activator" MODE_OF_OPERATION = "mode_of_operation" PDP_API_VERSION = "PDP_API_VERSION" + QUERY_TIMEOUT = "timeout" + PDP_USER = "PDP_USER" + PDP_PWD = "PDP_PWD" + DMAAP_MR_USER = "DMAAP_MR_USER" + DMAAP_MR_PWD = "DMAAP_MR_PWD" system_name = SERVICE_NAME_POLICY_HANDLER wservice_port = 25577 @@ -165,6 +172,8 @@ class Config(object): tls_server_cert_file = None tls_private_key_file = None tls_server_ca_chain_file = None + _pdp_authorization = None + _dmaap_mr_authorization = None _local_config = Settings() discovered_config = Settings() @@ -254,6 +263,18 @@ class Config(object): Config._pdp_api_version = os.environ.get( Config.PDP_API_VERSION, loaded_config.get(Config.PDP_API_VERSION.lower())) + pdp_user = os.environ.get(Config.PDP_USER) + pdp_pwd = os.environ.get(Config.PDP_PWD) + if pdp_user and pdp_pwd: + Config._pdp_authorization = "Basic {}".format(base64.b64encode( + ("{}:{}".format(pdp_user, pdp_pwd)).encode()).decode("utf-8")) + + dmaap_mr_user = os.environ.get(Config.DMAAP_MR_USER) + dmaap_mr_pwd = os.environ.get(Config.DMAAP_MR_PWD) + if dmaap_mr_user and dmaap_mr_pwd: + Config._dmaap_mr_authorization = "Basic {}".format(base64.b64encode( + ("{}:{}".format(dmaap_mr_user, dmaap_mr_pwd)).encode()).decode("utf-8")) + local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {}) Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name) @@ -261,6 +282,26 @@ class Config(object): Config._local_config.set_config(local_config, auto_commit=True) + @staticmethod + def _overwrite_discovered_config(audit, discovered_config): + """replace the secrets in discovered_config with data from environment""" + changes = [] + if Config._pdp_authorization: + pdp_cfg = discovered_config.get("policy_engine", {}) + if pdp_cfg.get("url", "").lower().startswith("https:"): + pdp_cfg.get("headers", {})["Authorization"] = Config._pdp_authorization + changes.append("pdp_authorization") + + if Config._dmaap_mr_authorization: + dmaap_mr_cfg = discovered_config.get("dmaap_mr", {}) + if dmaap_mr_cfg.get("url", "").lower().startswith("https:"): + dmaap_mr_cfg.get("headers", {})["Authorization"] = Config._dmaap_mr_authorization + changes.append("dmaap_mr_authorization") + + if changes: + _LOGGER.info(audit.info("overwritten discovered config: {}".format(", ".join(changes)))) + + @staticmethod def discover(audit): """bring the config settings from the discovery service""" @@ -269,14 +310,17 @@ class Config(object): new_config = DiscoveryClient.get_value(audit, discovery_key) if not new_config or not isinstance(new_config, dict): - _LOGGER.warning("unexpected config from discovery: %s", new_config) + _LOGGER.warning(audit.warn("unexpected config from discovery: {}".format(new_config))) return - _LOGGER.debug("loaded config from discovery(%s): %s", - discovery_key, Audit.json_dumps(new_config)) + _LOGGER.debug(audit.debug("loaded config from discovery({}): {}".format( + discovery_key, Audit.json_dumps(new_config)))) + discovered_config = new_config.get(Config.SERVICE_NAME_POLICY_HANDLER) + + Config._overwrite_discovered_config(audit, discovered_config) - Config.discovered_config.set_config(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) - _LOGGER.info("config from discovery: %s", Config.discovered_config) + Config.discovered_config.set_config(discovered_config) + _LOGGER.info(audit.info("config from discovery: {}".format(Config.discovered_config))) @staticmethod diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index a127e54..997ec3e 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,8 +25,7 @@ import requests from .config import Config, Settings from .discovery import DiscoveryClient -from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, - AuditResponseCode, Metrics) +from .onap.audit import AuditHttpCode, AuditResponseCode, Metrics from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES, POLICY_FILTER_MATCHES, POLICY_FILTERS, REMOVED_POLICIES, TARGET_ENTITY) @@ -172,10 +171,10 @@ class DeployHandler(object): changed, pool_size = DeployHandler._settings.get_by_key(Config.POOL_CONNECTIONS, 10) if changed: DeployHandler._requests_session.mount( - 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + 'https://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) DeployHandler._requests_session.mount( - 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + 'http://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) _, config_dh = DeployHandler._settings.get_by_key(Config.DEPLOY_HANDLER) @@ -301,7 +300,8 @@ class DeployHandler(object): metrics = Metrics(aud_parent=audit, targetEntity="{} policy_update".format(target_entity), targetServiceName=url) - headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} + + headers = metrics.put_request_id_into_headers() log_action = "put to {} at {}".format(target_entity, url) log_data = "msg={} headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format( @@ -330,7 +330,7 @@ class DeployHandler(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = "failed to {} {}: {} {}".format( log_action, type(ex).__name__, str(ex), log_data) - _LOGGER.exception(error_msg) + _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) @@ -371,7 +371,7 @@ class DeployHandler(object): metrics = Metrics(aud_parent=audit, targetEntity="{} get_deployed_policies".format(target_entity), targetServiceName=url) - headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} + headers = metrics.put_request_id_into_headers() log_action = "get from {} at {}".format(target_entity, url) log_data = "headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format( @@ -387,7 +387,7 @@ class DeployHandler(object): metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) metrics.metrics(error_msg) - return None, None + return {"error": "failed to retrieve policies from deployment-handler"}, None, None res = None try: @@ -399,11 +399,11 @@ class DeployHandler(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = "failed to {} {}: {} {}".format( log_action, type(ex).__name__, str(ex), log_data) - _LOGGER.exception(error_msg) + _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) - return None, None + return {"error": "failed to retrieve policies from deployment-handler"}, None, None metrics.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) @@ -414,7 +414,7 @@ class DeployHandler(object): if res.status_code != requests.codes.ok: _LOGGER.error(log_line) - return None, None + return {"error": "failed to retrieve policies from deployment-handler"}, None, None result = res.json() or {} DeployHandler._server_instance_changed(result, metrics) @@ -426,10 +426,10 @@ class DeployHandler(object): _LOGGER.warning(audit.warn( "found no deployed policies or policy-filters: {}".format(log_line), error_code=AuditResponseCode.DATA_ERROR)) - return policies, policy_filters + return {"warning": "got no deployed policies"}, None, None _LOGGER.info(log_line) - return policies, policy_filters + return None, policies, policy_filters @staticmethod def _server_instance_changed(result, metrics): diff --git a/policyhandler/discovery.py b/policyhandler/discovery.py index 83f54ac..b6b6bbd 100644 --- a/policyhandler/discovery.py +++ b/policyhandler/discovery.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -85,7 +85,7 @@ class DiscoveryClient(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line, type(ex).__name__, str(ex))) - _LOGGER.exception(error_msg) + _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) @@ -136,7 +136,7 @@ class DiscoveryClient(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = ("failed {}/{} to {} {}: {}".format(status_code, error_code, log_line, type(ex).__name__, str(ex))) - _LOGGER.exception(error_msg) + _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py index 3c09c16..269dfd8 100644 --- a/policyhandler/onap/audit.py +++ b/policyhandler/onap/audit.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ """ import copy +import hashlib import json import os import re @@ -41,6 +42,7 @@ from .health import Health from .process_info import ProcessInfo REQUEST_X_ECOMP_REQUESTID = "X-ECOMP-RequestID" +REQUEST_X_ONAP_REQUESTID = "X-ONAP-RequestID" REQUEST_REMOTE_ADDR = "Remote-Addr" REQUEST_HOST = "Host" HOSTNAME = "HOSTNAME" @@ -118,7 +120,7 @@ class AuditResponseCode(Enum): class _Audit(object): """put the audit object on stack per each initiating request in the system - :request_id: is the X-ECOMP-RequestID for tracing + :request_id: is the X-ONAP-RequestID or X-ECOMP-RequestID for tracing :req_message: is the request message string for logging @@ -172,7 +174,7 @@ class _Audit(object): """create audit object per each request in the system :job_name: is the name of the audit job for health stats - :request_id: is the X-ECOMP-RequestID for tracing + :request_id: is the X-ONAP-RequestID or X-ECOMP-RequestID for tracing :req_message: is the request message string for logging :kwargs: - put any request related params into kwargs """ @@ -184,6 +186,12 @@ class _Audit(object): self.max_http_status_code = 0 self._lock = threading.Lock() + def put_request_id_into_headers(self, headers=None): + """when sending message out - put the request_id into headers""" + headers = headers or {} + headers[REQUEST_X_ONAP_REQUESTID] = self.request_id + headers[REQUEST_X_ECOMP_REQUESTID] = self.request_id + return headers @staticmethod def register_item_health(health_name, health_getter=None): @@ -241,6 +249,8 @@ class _Audit(object): def set_http_status_code(self, http_status_code): """accumulate the highest(worst) http status code""" + if http_status_code is None: + http_status_code = AuditHttpCode.SERVER_INTERNAL_ERROR.value with self._lock: if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value: self.max_http_status_code = max(http_status_code, self.max_http_status_code) @@ -308,9 +318,7 @@ class _Audit(object): """debug+error - the warn level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - if error_code and isinstance(error_code, AuditResponseCode): - all_kwargs[ERROR_CODE] = error_code.value - all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + self._set_error_code_in_kwargs(error_code, all_kwargs) _Audit._logger_debug.warn(log_line, **all_kwargs) _Audit._logger_error.warn(log_line, **all_kwargs) @@ -320,9 +328,7 @@ class _Audit(object): """debug+error - the error level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - if error_code and isinstance(error_code, AuditResponseCode): - all_kwargs[ERROR_CODE] = error_code.value - all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + self._set_error_code_in_kwargs(error_code, all_kwargs) _Audit._logger_debug.error(log_line, **all_kwargs) _Audit._logger_error.error(log_line, **all_kwargs) @@ -332,25 +338,32 @@ class _Audit(object): """debug+error - the fatal level of logging""" all_kwargs = self.merge_all_kwargs(**kwargs) - if error_code and isinstance(error_code, AuditResponseCode): - all_kwargs[ERROR_CODE] = error_code.value - all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + self._set_error_code_in_kwargs(error_code, all_kwargs) _Audit._logger_debug.fatal(log_line, **all_kwargs) _Audit._logger_error.fatal(log_line, **all_kwargs) return log_line + def _set_error_code_in_kwargs(self, error_code, all_kwargs): + """set the error code and description in kwargs for logging""" + if not error_code or not isinstance(error_code, AuditResponseCode): + error_code = AuditResponseCode.UNKNOWN_ERROR + all_kwargs[ERROR_CODE] = error_code.value + all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code) + @staticmethod def hide_secrets(obj): """hides the known secret field values of the dictionary""" if not isinstance(obj, dict): return obj - for key in obj: + for key, val in obj.items(): if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]: - obj[key] = "*" - elif isinstance(obj[key], dict): - obj[key] = _Audit.hide_secrets(obj[key]) + hval = hashlib.sha256() + hval.update(val.encode()) + obj[key] = "***({})***".format(hval.hexdigest()) + elif isinstance(val, dict): + obj[key] = _Audit.hide_secrets(val) return obj @@ -375,7 +388,7 @@ class Audit(_Audit): """create audit object per each request in the system :job_name: is the name of the audit job for health stats - :request_id: is the X-ECOMP-RequestID for tracing + :request_id: is the X-ONAP-RequestID or X-ECOMP-RequestID for tracing :req_message: is the request message string for logging :aud_parent: is the parent Audit - used for sub-query metrics to other systems :kwargs: - put any request related params into kwargs @@ -388,7 +401,9 @@ class Audit(_Audit): headers = self.kwargs.get("headers", {}) if headers: if not self.request_id: - self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID) + self.request_id = headers.get(REQUEST_X_ONAP_REQUESTID, + headers.get(REQUEST_X_ECOMP_REQUESTID)) + self.kwargs.setdefault(AUDIT_IPADDRESS, headers.get(REQUEST_REMOTE_ADDR)) self.kwargs.setdefault(AUDIT_SERVER, headers.get(REQUEST_HOST)) diff --git a/policyhandler/pdp_api/dmaap_mr.py b/policyhandler/pdp_api/dmaap_mr.py new file mode 100644 index 0000000..2d4d468 --- /dev/null +++ b/policyhandler/pdp_api/dmaap_mr.py @@ -0,0 +1,202 @@ +# ================================================================================ +# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +"""policy-client communicates with policy-engine thru REST API""" + +import copy +import json +from threading import Lock + +import requests + +from ..config import Config, Settings +from ..onap.audit import AuditHttpCode, AuditResponseCode, Metrics +from ..utils import Utils + +_LOGGER = Utils.get_logger(__file__) + +class DmaapMr(object): + """using the http API to policy-engine""" + _lazy_inited = False + DEFAULT_TIMEOUT_IN_SECS = 60 + + _lock = Lock() + _settings = Settings(Config.DMAAP_MR) + + _requests_session = None + _long_polling = False + _target_entity = None + _url = None + _query = {} + _headers = None + _custom_kwargs = {} + _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS + + @staticmethod + def _init(audit): + """init static config""" + DmaapMr._custom_kwargs = {} + tls_ca_mode = None + + if not DmaapMr._requests_session: + DmaapMr._requests_session = requests.Session() + DmaapMr._requests_session.mount( + 'https://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=1, + pool_block=True)) + DmaapMr._requests_session.mount( + 'http://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=1, + pool_block=True)) + + + _, config = DmaapMr._settings.get_by_key(Config.DMAAP_MR) + if config: + DmaapMr._url = config.get("url") + DmaapMr._headers = config.get("headers", {}) + DmaapMr._query = copy.deepcopy(config.get("query", {})) + if DmaapMr._query.get(Config.QUERY_TIMEOUT, 0) < 1000: + DmaapMr._query[Config.QUERY_TIMEOUT] = 15000 + + DmaapMr._target_entity = config.get("target_entity", Config.DMAAP_MR) + + tls_ca_mode = config.get(Config.TLS_CA_MODE) + DmaapMr._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) + DmaapMr._timeout_in_secs = config.get(Config.TIMEOUT_IN_SECS) + if not DmaapMr._timeout_in_secs or DmaapMr._timeout_in_secs < 1: + DmaapMr._timeout_in_secs = DmaapMr.DEFAULT_TIMEOUT_IN_SECS + + _LOGGER.info( + audit.info(("config DMaaP MR({}) url({}) query({}) headers({}) " + "tls_ca_mode({}) custom_kwargs({}) timeout_in_secs({}): {}").format( + DmaapMr._target_entity, DmaapMr._url, + Metrics.json_dumps(DmaapMr._query), + Metrics.json_dumps(DmaapMr._headers), tls_ca_mode, + json.dumps(DmaapMr._custom_kwargs), DmaapMr._timeout_in_secs, + DmaapMr._settings))) + + DmaapMr._settings.commit_change() + DmaapMr._lazy_inited = True + + @staticmethod + def reconfigure(audit): + """reconfigure""" + with DmaapMr._lock: + DmaapMr._settings.set_config(Config.discovered_config) + if not DmaapMr._settings.is_changed(): + DmaapMr._settings.commit_change() + return False + + DmaapMr._lazy_inited = False + DmaapMr._long_polling = False + DmaapMr._init(audit) + return True + + @staticmethod + def _lazy_init(audit): + """init static config""" + if DmaapMr._lazy_inited: + return + + with DmaapMr._lock: + if DmaapMr._lazy_inited: + return + + DmaapMr._settings.set_config(Config.discovered_config) + DmaapMr._long_polling = False + DmaapMr._init(audit) + + @staticmethod + def get_policy_updates(audit): + """ + get from DMaaP MR - returns json list of stringified messages + + example [ + "{\"deployed-policies\":[ + {\"policy-type\":\"onap.policies.monitoring.cdap.tca.hi.lo.app\", + \"policy-type-version\":\"1.0.0\", + \"policy-id\":\"onap.scaleout.tca\", + \"policy-version\":\"2.2.2\", + \"success-count\":3, + \"failure-count\":0 + }], + \"undeployed-policies\":[ + {\"policy-type\":\"onap.policies.monitoring.cdap.tca.hi.lo.app\", + \"policy-type-version\":\"1.0.0\", + \"policy-id\":\"onap.scaleout.tca\", + \"policy-version\":\"1.0.0\", + \"success-count\":3, + \"failure-count\":0 + }]}" + ] + """ + DmaapMr._lazy_init(audit) + + if not DmaapMr._url: + _LOGGER.error( + audit.error("no url for DMaaP MR", error_code=AuditResponseCode.AVAILABILITY_ERROR)) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None + + with DmaapMr._lock: + target_entity = DmaapMr._target_entity + url = DmaapMr._url + params = copy.deepcopy(DmaapMr._query) if DmaapMr._long_polling else None + headers = copy.deepcopy(DmaapMr._headers) + timeout_in_secs = DmaapMr._timeout_in_secs + custom_kwargs = copy.deepcopy(DmaapMr._custom_kwargs) + DmaapMr._long_polling = True + + metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url) + + headers = metrics.put_request_id_into_headers(headers) + + log_line = ( + "get from {} at {} with params={}, headers={}, custom_kwargs({}) timeout_in_secs({})" + .format(target_entity, url, json.dumps(params), Metrics.json_dumps(headers), + json.dumps(custom_kwargs), timeout_in_secs)) + + _LOGGER.info(metrics.metrics_start(log_line)) + + res = None + try: + res = DmaapMr._requests_session.get(url, params=params, headers=headers, + timeout=timeout_in_secs, **custom_kwargs) + + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) + + _LOGGER.exception(metrics.fatal(error_msg)) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None + + log_line = "response {} from {}: text={} headers={}".format( + res.status_code, log_line, res.text, Metrics.json_dumps(dict(res.headers.items()))) + _LOGGER.info(log_line) + + metrics.set_http_status_code(res.status_code) + audit.set_http_status_code(res.status_code) + metrics.metrics(log_line) + + policy_updates = None + if res.status_code == requests.codes.ok: + policy_updates = res.json() + + return policy_updates diff --git a/policyhandler/pdp_api/pdp_consts.py b/policyhandler/pdp_api/pdp_consts.py index 2337456..32f80f3 100644 --- a/policyhandler/pdp_api/pdp_consts.py +++ b/policyhandler/pdp_api/pdp_consts.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -33,3 +33,7 @@ PDP_REQ_ONAP_NAME = "ONAPName" # always "DCAE" PDP_REQ_ONAP_COMPONENT = "ONAPComponent" PDP_REQ_ONAP_INSTANCE = "ONAPInstance" PDP_REQ_RESOURCE = "resource" + +# fields from policy-update notification +DEPLOYED_POLICIES = "deployed-policies" +UNDEPLOYED_POLICIES = "undeployed-policies" diff --git a/policyhandler/pdp_api/policy_listener.py b/policyhandler/pdp_api/policy_listener.py index 9fa4695..0d33785 100644 --- a/policyhandler/pdp_api/policy_listener.py +++ b/policyhandler/pdp_api/policy_listener.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,39 +17,173 @@ """ policy-listener communicates with policy-engine -to receive push notifications +to receive push notifications through DMaaP MR on updates and removal of policies. -on receiving the policy-notifications, the policy-receiver +on receiving the policy-notifications, the policy-listener passes the notifications to policy-updater """ +import json import os +from threading import Event, Lock, Thread -from ..utils import ToBeImplementedException, Utils +from ..onap.audit import Audit, AuditResponseCode +from ..utils import Utils +from .dmaap_mr import DmaapMr +from .pdp_consts import (DEPLOYED_POLICIES, PDP_METADATA, PDP_POLICY_ID, + PDP_POLICY_VERSION, UNDEPLOYED_POLICIES) _LOGGER = Utils.get_logger(__file__) -class PolicyListener(object): - """listener to PolicyEngine""" +class PolicyListener(Thread): + """listener to DMaaP MR""" PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) + SLEEP_BEFORE_RESTARTING = 30 - def __init__(self, *_): + def __init__(self, audit, policy_updater): """listener to receive the policy notifications from PolicyEngine""" - _LOGGER.info("to_be_implemented") - raise ToBeImplementedException() + Thread.__init__(self, name="policy_listener", daemon=True) - def reconfigure(self, _): - """configure and reconfigure the listener""" - _LOGGER.info("to_be_implemented") - raise ToBeImplementedException() + self._policy_updater = policy_updater + self._lock = Lock() + self._run_event = Event() + self._keep_running = True + self._first_loop = True + + self._dmaap_mr = None + self.reconfigure(audit) + + def reconfigure(self, audit): + """configure and reconfigure the DMaaP MR""" + reconfigured = DmaapMr.reconfigure(audit) + if reconfigured and not self._first_loop: + with self._lock: + self._first_loop = True + return reconfigured def run(self): - """listen on web-socket and pass the policy notifications to policy-updater""" - _LOGGER.info("to_be_implemented") - raise ToBeImplementedException() - - def shutdown(self, _): - """Shutdown the policy-listener""" - _LOGGER.info("to_be_implemented") - raise ToBeImplementedException() + """listen on DMaaP MR and pass the policy notifications to policy-updater""" + _LOGGER.info("starting policy_listener...") + delayed_restarting = False + while True: + if not self._get_keep_running(): + break + + if delayed_restarting: + _LOGGER.info( + "going to sleep for %s secs before restarting policy-notifications", + PolicyListener.SLEEP_BEFORE_RESTARTING) + + self._run_event.clear() + self._run_event.wait(PolicyListener.SLEEP_BEFORE_RESTARTING) + if not self._get_keep_running(): + break + + audit = Audit(job_name="policy_update", + req_message="waiting for policy-notifications...", + retry_get_config=True) + + policy_updates = DmaapMr.get_policy_updates(audit) + + if not self._get_keep_running(): + audit.audit_done(result="exiting policy_listener") + break + + delayed_restarting = not audit.is_success() + if self._first_loop: + policy_updater = None + with self._lock: + if self._first_loop: + self._first_loop = False + policy_updater = self._policy_updater + if policy_updater is not None: + audit.req_message = "first catch_up" + _LOGGER.info(audit.info("first catch_up - ignoring policy-updates: {}" + .format(json.dumps(policy_updates)))) + policy_updater.catch_up(audit) + elif not policy_updates: + _LOGGER.info(audit.info( + "no policy-updates: {}".format(json.dumps(policy_updates)))) + audit.audit_done(result="no policy-updates") + else: + self._on_policy_update_message(audit, policy_updates) + + _LOGGER.info("exit policy_listener") + + def _get_keep_running(self): + """thread-safe check whether to continue running""" + with self._lock: + keep_running = self._keep_running + return keep_running + + def _on_policy_update_message(self, audit, policy_updates): + """received the notification from PDP""" + try: + _LOGGER.info("Received notification message: %s", json.dumps(policy_updates)) + if not policy_updates: + return + + policies_updated = [] + + for idx, pdp_update_msg in enumerate(policy_updates): + pdp_update_msg = Utils.safe_json_parse(pdp_update_msg) + + if not pdp_update_msg or not isinstance(pdp_update_msg, dict): + _LOGGER.warning(audit.warn( + "unexpected message from PDP: {}".format(json.dumps(pdp_update_msg)), + error_code=AuditResponseCode.DATA_ERROR)) + continue + + _LOGGER.debug("raw policy_update[%s]: %s", idx, json.dumps(pdp_update_msg)) + + deployed_policies = [ + {PDP_METADATA: {PDP_POLICY_ID: p_deployed.get(PDP_POLICY_ID), + PDP_POLICY_VERSION: p_deployed.get(PDP_POLICY_VERSION)}} + for p_deployed in pdp_update_msg.get(DEPLOYED_POLICIES, []) + if (p_deployed.get(PDP_POLICY_ID) is not None + and p_deployed.get(PDP_POLICY_VERSION) is not None)] + + undeployed_policies = [ + {PDP_METADATA: {PDP_POLICY_ID: p_undeployed.get(PDP_POLICY_ID), + PDP_POLICY_VERSION: p_undeployed.get(PDP_POLICY_VERSION)}} + for p_undeployed in pdp_update_msg.get(UNDEPLOYED_POLICIES, []) + if (p_undeployed.get(PDP_POLICY_ID) is not None + and p_undeployed.get(PDP_POLICY_VERSION) is not None)] + + if not deployed_policies and not undeployed_policies: + _LOGGER.warning(audit.warn( + "no policy deployed or undeployed: {}".format(json.dumps(pdp_update_msg)), + error_code=AuditResponseCode.DATA_ERROR)) + continue + + policy_update = {DEPLOYED_POLICIES: deployed_policies, + UNDEPLOYED_POLICIES: undeployed_policies} + _LOGGER.info(audit.info("policy_update[{}]: {}" + .format(idx, json.dumps(policy_update)))) + + policies_updated.append(policy_update) + + if not policies_updated: + _LOGGER.warning(audit.warn( + "erroneous notification from PDP: {}".format(json.dumps(policy_updates)), + error_code=AuditResponseCode.DATA_ERROR)) + return + + with self._lock: + policy_updater = self._policy_updater + if policy_updater is not None: + policy_updater.policy_update(audit, policies_updated) + except Exception as ex: + error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex), + "on_policy_update_message", + json.dumps(policy_updates)) + _LOGGER.exception(audit.fatal(error_msg)) + + def shutdown(self, audit): + """Shutdown the policy_listener""" + _LOGGER.info(audit.info("shutdown policy_listener - no waiting...")) + with self._lock: + self._keep_running = False + self._policy_updater = None + self._run_event.set() diff --git a/policyhandler/pdp_api/policy_matcher.py b/policyhandler/pdp_api/policy_matcher.py index 57258c3..2972fb8 100644 --- a/policyhandler/pdp_api/policy_matcher.py +++ b/policyhandler/pdp_api/policy_matcher.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +19,98 @@ import os +from ..deploy_handler import DeployHandler, PolicyUpdateMessage +from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, + POLICY_VERSIONS) +from ..utils import Utils +from .pdp_consts import POLICY_VERSION +from .policy_rest import PolicyRest + +_LOGGER = Utils.get_logger(__file__) class PolicyMatcher(object): """policy-matcher - static class""" + PENDING_UPDATE = "pending_update" PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) + + @staticmethod + def build_catch_up_message(audit, deployed_policies, _=None): + """find the latest policies from policy-engine for the deployed policies""" + + if not deployed_policies: + error_txt = "no deployed policies" + _LOGGER.warning(error_txt) + return {"error": error_txt}, None + + pdp_response = PolicyRest.get_latest_policies(audit, policy_ids=list(deployed_policies)) + + if not audit.is_success(): + error_txt = "failed to retrieve policies from policy-engine" + _LOGGER.warning(error_txt) + return {"error": error_txt}, None + + latest_policies = pdp_response.get(LATEST_POLICIES, {}) + errored_policies = pdp_response.get(ERRORED_POLICIES, {}) + + latest_policies, changed_policies = PolicyMatcher._match_policies( + latest_policies, deployed_policies) + + errored_policies = dict((policy_id, policy) + for (policy_id, policy) in errored_policies.items() + if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS)) + + removed_policies = dict( + (policy_id, True) + for (policy_id, deployed_policy) in deployed_policies.items() + if deployed_policy.get(POLICY_VERSIONS) + and policy_id not in latest_policies + and policy_id not in errored_policies + ) + + return ({LATEST_POLICIES: latest_policies, ERRORED_POLICIES: errored_policies}, + PolicyUpdateMessage(changed_policies, removed_policies)) + + @staticmethod + def match_to_deployed_policies(audit, policies_updated, policies_removed): + """match the policies_updated, policies_removed versus deployed policies""" + _, deployed_policies, _ = DeployHandler.get_deployed_policies(audit) + if not audit.is_success(): + return {}, {}, {} + + _, changed_policies = PolicyMatcher._match_policies(policies_updated, deployed_policies) + + policies_removed = dict((policy_id, policy) + for (policy_id, policy) in policies_removed.items() + if deployed_policies.get(policy_id, {}).get(POLICY_VERSIONS)) + + return changed_policies, policies_removed, {} + + + @staticmethod + def _match_policies(policies, deployed_policies): + """ + Match policies to deployed policies by policy_id. + + Also calculates the policies that changed in comparison to deployed policies + """ + matching_policies = {} + changed_policies = {} + + policies = policies or {} + deployed_policies = deployed_policies or {} + + for (policy_id, policy) in policies.items(): + new_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) + deployed_policy = deployed_policies.get(policy_id) + + if deployed_policy: + matching_policies[policy_id] = policy + + policy_changed = (deployed_policy and new_version + and (deployed_policy.get(PolicyMatcher.PENDING_UPDATE) + or {new_version} ^ + deployed_policy.get(POLICY_VERSIONS, {}).keys())) + if policy_changed: + changed_policies[policy_id] = policy + + return matching_policies, changed_policies diff --git a/policyhandler/pdp_api/policy_rest.py b/policyhandler/pdp_api/policy_rest.py index 14d9296..0b33ccd 100644 --- a/policyhandler/pdp_api/policy_rest.py +++ b/policyhandler/pdp_api/policy_rest.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,16 +20,19 @@ import copy import json import os +import time import urllib.parse +from multiprocessing.dummy import Pool as ThreadPool from threading import Lock import requests from ..config import Config, Settings -from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, - AuditResponseCode, Metrics) +from ..onap.audit import AuditHttpCode, AuditResponseCode, Metrics +from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, + POLICY_ID, POLICY_NAMES) from ..utils import Utils -from .pdp_consts import PDP_POLICIES +from .pdp_consts import PDP_POLICIES, POLICY_NAME, POLICY_VERSION from .policy_utils import PolicyUtils _LOGGER = Utils.get_logger(__file__) @@ -37,11 +40,15 @@ _LOGGER = Utils.get_logger(__file__) class PolicyRest(object): """using the http API to policy-engine""" PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) - _lazy_inited = False + EXPECTED_VERSIONS = "expected_versions" + IGNORE_POLICY_NAMES = "ignore_policy_names" DEFAULT_TIMEOUT_IN_SECS = 60 + _lazy_inited = False _lock = Lock() - _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS) + _settings = Settings(Config.FIELD_POLICY_ENGINE, Config.POOL_CONNECTIONS, + Config.THREAD_POOL_SIZE, + Config.POLICY_RETRY_COUNT, Config.POLICY_RETRY_SLEEP) _target_entity = None _requests_session = None @@ -49,6 +56,9 @@ class PolicyRest(object): _url_pdp_decision = None _headers = None _custom_kwargs = {} + _thread_pool_size = 4 + _policy_retry_count = 1 + _policy_retry_sleep = 0 _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS @staticmethod @@ -63,10 +73,10 @@ class PolicyRest(object): changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20) if changed: PolicyRest._requests_session.mount( - 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + 'https://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) PolicyRest._requests_session.mount( - 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + 'http://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) @@ -77,6 +87,15 @@ class PolicyRest(object): PolicyRest._url, config.get("path_decision", "/decision/v1/")) PolicyRest._headers = config.get("headers", {}) PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) + _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( + Config.THREAD_POOL_SIZE, 4) + if PolicyRest._thread_pool_size < 2: + PolicyRest._thread_pool_size = 2 + + _, PolicyRest._policy_retry_count = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_COUNT, 1) + _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key( + Config.POLICY_RETRY_SLEEP, 0) tls_ca_mode = config.get(Config.TLS_CA_MODE) PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) @@ -121,13 +140,13 @@ class PolicyRest(object): PolicyRest._init() @staticmethod - def _pdp_get_decision(audit, pdp_req): - """Communication with the policy-engine""" + def _pdp_get_decision(audit, policy_ids): + """get policies from the policy-engine by policy-ids""" if not PolicyRest._url: _LOGGER.error( audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR)) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - return None + return None, None with PolicyRest._lock: session = PolicyRest._requests_session @@ -137,9 +156,11 @@ class PolicyRest(object): headers = copy.deepcopy(PolicyRest._headers) custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs) + pdp_req = PolicyUtils.gen_req_to_pdp(policy_ids) + metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url) - headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id + headers = metrics.put_request_id_into_headers(headers) log_action = "post to {} at {}".format(target_entity, url) log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format( @@ -159,11 +180,11 @@ class PolicyRest(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) - _LOGGER.exception(error_msg) + _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) - return None + return (error_code, None) log_line = "response {} from {}: text={} headers={}".format( res.status_code, log_line, res.text, @@ -174,40 +195,282 @@ class PolicyRest(object): audit.set_http_status_code(res.status_code) metrics.metrics(log_line) - policy_bodies = None + latest_policies = None if res.status_code == requests.codes.ok: - policy_bodies = res.json().get(PDP_POLICIES) + policy_bodies = res.json().get(PDP_POLICIES, {}) + latest_policies = dict((policy_id, PolicyUtils.convert_to_policy(policy)) + for (policy_id, policy) in policy_bodies.items()) + + return res.status_code, latest_policies - return policy_bodies @staticmethod def get_latest_policy(aud_policy_id): """safely try retrieving the latest policy for the policy_id from the policy-engine""" - audit, policy_id, _, _ = aud_policy_id + audit, policy_id, expected_versions, ignore_policy_names = aud_policy_id + str_metrics = "policy_id({0}), expected_versions({1}) ignore_policy_names({2})".format( + policy_id, json.dumps(expected_versions), json.dumps(ignore_policy_names)) + try: - PolicyRest._lazy_init() + return PolicyRest._get_latest_policy( + audit, policy_id, expected_versions, ignore_policy_names, str_metrics) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policy", str_metrics)) + + _LOGGER.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None - pdp_req = PolicyUtils.gen_req_to_pdp(policy_id) - policy_bodies = PolicyRest._pdp_get_decision(audit, pdp_req) - log_line = "looking for policy_id({}) in policy_bodies: {}".format( - policy_id, json.dumps(policy_bodies)) - _LOGGER.info(log_line) + @staticmethod + def _get_latest_policy(audit, policy_id, + expected_versions, ignore_policy_names, str_metrics): + """retry several times getting the latest policy for the policy_id from the policy-engine""" + PolicyRest._lazy_init() + latest_policy = None + status_code = 0 + retry_get_config = audit.kwargs.get("retry_get_config") - latest_policy = None - if policy_bodies and policy_id in policy_bodies: - latest_policy = PolicyUtils.convert_to_policy(policy_bodies[policy_id]) + for retry in range(1, PolicyRest._policy_retry_count + 1): + _LOGGER.debug("try(%s) retry_get_config(%s): %s", retry, retry_get_config, str_metrics) + + done, removed, latest_policy, status_code = PolicyRest._get_latest_policy_once( + audit, policy_id, expected_versions, ignore_policy_names) - if not PolicyUtils.validate_policy(latest_policy): + if removed: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) - _LOGGER.error(audit.error( - "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), - error_code=AuditResponseCode.DATA_ERROR)) + return None + + if done or not retry_get_config or not PolicyRest._policy_retry_sleep: + break + + if retry == PolicyRest._policy_retry_count: + _LOGGER.error( + audit.error("gave up retrying after #{} for policy_id({}) from PDP {}" + .format(retry, policy_id, PolicyRest._url_pdp_decision), + error_code=AuditResponseCode.DATA_ERROR)) + break + + _LOGGER.warning(audit.warn( + "will retry({}) for policy_id({}) in {} secs from PDP {}".format( + retry, policy_id, PolicyRest._policy_retry_sleep, PolicyRest._url_pdp_decision), + error_code=AuditResponseCode.DATA_ERROR)) + time.sleep(PolicyRest._policy_retry_sleep) + + audit.set_http_status_code(status_code) + if not PolicyUtils.validate_policy(latest_policy): + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) + _LOGGER.error(audit.error( + "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), + error_code=AuditResponseCode.DATA_ERROR)) + + return latest_policy + + @staticmethod + def _get_latest_policy_once(audit, policy_id, expected_versions, ignore_policy_names): + """single attempt to get the latest policy for the policy_id from the policy-engine""" + + status_code, latest_policies = PolicyRest._pdp_get_decision(audit, policy_id) + + if (ignore_policy_names and not expected_versions and not latest_policies + and AuditHttpCode.HTTP_OK.value == status_code): + return True, True, None, status_code + + log_line = "{} looking for policy_id({}) in latest_policies: {}".format( + status_code, policy_id, json.dumps(latest_policies)) + _LOGGER.info(log_line) + + latest_policy = (latest_policies or {}).get(policy_id) + + log_error = "" + if latest_policy: + policy_body = latest_policy.get(POLICY_BODY, {}) + policy_version = policy_body.get(POLICY_VERSION) + policy_name = policy_body.get(POLICY_NAME) + if expected_versions and policy_version not in expected_versions: + log_error = ("received unexpected policy version({}) instead of ({})" + " from PDP for policy_id={}: {}" + .format(policy_version, json.dumps(expected_versions), + policy_id, json.dumps(latest_policy))) + elif ignore_policy_names and policy_name in ignore_policy_names: + log_error = ("unexpectedly received policy version({}) from PDP" + " for policy_id={}: {}. to ignore-policy-names {}" + .format(policy_version, policy_id, + json.dumps(latest_policy), + json.dumps(ignore_policy_names))) + + if not latest_policy or log_error: + _LOGGER.error(audit.error( + log_error or "received unexpected policy data({}) from PDP for policy_id={}: {}" + .format(json.dumps(latest_policy), policy_id, json.dumps(latest_policies)), + error_code=AuditResponseCode.DATA_ERROR)) + latest_policy = None + + done = bool(latest_policy or audit.is_serious_error(status_code)) + return done, False, latest_policy, status_code + + @staticmethod + def get_latest_updated_policies(audit, updated_policies, removed_policies): + """safely try retrieving the latest policies for the list of policy_names""" + if not updated_policies and not removed_policies: + return None, None + + policies_updated = [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION)) + for policy_id, policy in updated_policies.items()] + policies_removed = [(policy_id, policy.get(POLICY_NAMES, {})) + for policy_id, policy in removed_policies.items()] + + if not policies_updated and not policies_removed: + return None, None + + str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format( + len(policies_updated), json.dumps(policies_updated), + len(policies_removed), json.dumps(policies_removed)) + + try: + return PolicyRest._get_latest_updated_policies( + audit, str_metrics, policies_updated, policies_removed) + + except Exception as ex: + error_msg = ("{0}: crash {1} {2} at {3}: {4}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_updated_policies", str_metrics)) + + _LOGGER.exception(error_msg) + audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) + audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) + return None, None + + @staticmethod + def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed): + """Get the latest policies of the list of policy_names from the policy-engine""" + PolicyRest._lazy_init() + metrics_total = Metrics( + aud_parent=audit, + targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity), + targetServiceName=PolicyRest._url_pdp_decision) + + metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics)) + _LOGGER.debug(str_metrics) + + policies_to_find = {} + for (policy_id, policy_version) in policies_updated: + if not policy_id or policy_version is None: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.EXPECTED_VERSIONS: {policy_version: True}, + PolicyRest.IGNORE_POLICY_NAMES: {} + } + continue + policy[PolicyRest.EXPECTED_VERSIONS][policy_version] = True + + for (policy_id, policy_names) in policies_removed: + if not policy_id: + continue + policy = policies_to_find.get(policy_id) + if not policy: + policies_to_find[policy_id] = { + POLICY_ID: policy_id, + PolicyRest.IGNORE_POLICY_NAMES: policy_names + } + continue + policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names) + + apns = [(audit, policy_id, + policy_to_find.get(PolicyRest.EXPECTED_VERSIONS), + policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)) + for (policy_id, policy_to_find) in policies_to_find.items()] + + policies = None + apns_length = len(apns) + _LOGGER.debug("apns_length(%s) policies_to_find %s", apns_length, + json.dumps(policies_to_find)) + + if apns_length == 1: + policies = [PolicyRest.get_latest_policy(apns[0])] + else: + pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length)) + policies = pool.map(PolicyRest.get_latest_policy, apns) + pool.close() + pool.join() + + metrics_total.metrics("result({}) get_latest_updated_policies {}: {} {}" + .format(apns_length, str_metrics, + len(policies), json.dumps(policies))) + + updated_policies = dict((policy[POLICY_ID], policy) + for policy in policies + if policy and policy.get(POLICY_ID)) + + removed_policies = dict((policy_id, True) + for (policy_id, policy_to_find) in policies_to_find.items() + if not policy_to_find.get(PolicyRest.EXPECTED_VERSIONS) + and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES) + and policy_id not in updated_policies) + + errored_policies = dict((policy_id, policy_to_find) + for (policy_id, policy_to_find) in policies_to_find.items() + if policy_id not in updated_policies + and policy_id not in removed_policies) + + _LOGGER.debug( + "result(%s) updated_policies %s, removed_policies %s, errored_policies %s", + apns_length, json.dumps(updated_policies), json.dumps(removed_policies), + json.dumps(errored_policies)) + + if errored_policies: + audit.set_http_status_code(AuditHttpCode.DATA_ERROR.value) + audit.error( + "errored_policies in PDP: {}".format(json.dumps(errored_policies)), + error_code=AuditResponseCode.DATA_ERROR) + + return updated_policies, removed_policies + + + @staticmethod + def get_latest_policies(audit, policy_ids=None): + """Get the latest policies by policy-ids from the policy-engine""" + result = {} + str_policy_ids = json.dumps(policy_ids or []) + + try: + PolicyRest._lazy_init() + if policy_ids: + _, latest_policies = PolicyRest._pdp_get_decision(audit, policy_ids) + + if not latest_policies: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) + _LOGGER.warning(audit.warn( + "received no policies from PDP for policy_ids {}" + .format(str_policy_ids), error_code=AuditResponseCode.DATA_ERROR)) + return latest_policies + + valid_policies = {} + errored_policies = {} + for (policy_id, policy) in latest_policies.items(): + if PolicyUtils.validate_policy(policy): + valid_policies[policy_id] = policy + else: + errored_policies[policy_id] = policy + + result[LATEST_POLICIES] = valid_policies + result[ERRORED_POLICIES] = errored_policies + + _LOGGER.debug("got policies for policy_ids: %s. result: %s", + str_policy_ids, json.dumps(result)) + return result - return latest_policy except Exception as ex: - error_msg = ("{}: get_latest_policy({}) crash {}: {}" - .format(audit.request_id, policy_id, type(ex).__name__, str(ex))) + error_msg = ("{}: crash {} {} at {}: {}" + .format(audit.request_id, type(ex).__name__, str(ex), + "get_latest_policies", str_policy_ids)) _LOGGER.exception(error_msg) audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) diff --git a/policyhandler/pdp_api/policy_updates.py b/policyhandler/pdp_api/policy_updates.py index eb3c3d1..15f5b0a 100644 --- a/policyhandler/pdp_api/policy_updates.py +++ b/policyhandler/pdp_api/policy_updates.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,10 +17,13 @@ """policy-updates accumulates the policy-update notifications from PDP""" +import json import os -from ..utils import Utils, ToBeImplementedException - +from ..policy_consts import POLICY_BODY, POLICY_ID, POLICY_NAMES +from ..utils import Utils +from .pdp_consts import DEPLOYED_POLICIES, POLICY_NAME, UNDEPLOYED_POLICIES +from .policy_utils import PolicyUtils _LOGGER = Utils.get_logger(__file__) @@ -30,9 +33,12 @@ class PolicyUpdates(object): def __init__(self): """init and reset""" + self._audit = None + self._policies_updated = {} + self._policies_removed = {} def reset(self): - """resets the state""" + """resets the state - removes the pending policy-updates""" self.__init__() def pop_policy_updates(self): @@ -40,10 +46,71 @@ class PolicyUpdates(object): Returns the consolidated (audit, policies_updated, policies_removed) and resets the state """ - _LOGGER.info("to_be_implemented") - return None, None, None + if not self._audit: + return None, None, None + + audit = self._audit + policies_updated = self._policies_updated + policies_removed = self._policies_removed + + self.reset() + + return audit, policies_updated, policies_removed + + + def push_policy_updates(self, audit, multi_policies_updated): + """ + consolidate the new policies_updated, policies_removed to existing ones + + receives + :multi_policies_updated: as [ + {DEPLOYED_POLICIES: [{PDP_METADATA: {POLICY_ID: , + POLICY_VERSION: }}, ...], + UNDEPLOYED_POLICIES: [{PDP_METADATA: {POLICY_ID: , + POLICY_VERSION: }}, ...] + }, ...] + """ + for p_single_updated in multi_policies_updated: + for p_undeployed in p_single_updated.get(UNDEPLOYED_POLICIES, []): + policy = PolicyUtils.convert_to_policy(p_undeployed) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + policy_name = policy.get(POLICY_BODY, {}).get(POLICY_NAME) + + if policy_id in self._policies_removed: + policy = self._policies_removed[policy_id] + + if POLICY_NAMES not in policy: + policy[POLICY_NAMES] = {} + policy[POLICY_NAMES][policy_name] = True + self._policies_removed[policy_id] = policy + + for p_deployed in p_single_updated.get(DEPLOYED_POLICIES, []): + policy = PolicyUtils.convert_to_policy(p_deployed) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + policy_name = policy.get(POLICY_BODY, {}).get(POLICY_NAME) + + self._policies_updated[policy_id] = policy + + rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES) + if rm_policy_names and policy_name in rm_policy_names: + del rm_policy_names[policy_name] + + req_message = ("policy-update notification - updated[{}], removed[{}]" + .format(len(self._policies_updated), + len(self._policies_removed))) + + if not self._audit: + self._audit = audit + else: + audit.audit_done(result="policy-updates queued to request_id({})" + .format(self._audit.request_id)) + self._audit.req_message = req_message - def push_policy_updates(self, *_): - """consolidate the new policies_updated, policies_removed to existing ones""" - _LOGGER.info("to_be_implemented") - raise ToBeImplementedException() + _LOGGER.info( + "pending(%s) for %s policies_updated %s policies_removed %s", + self._audit.request_id, req_message, + json.dumps(self._policies_updated), json.dumps(self._policies_removed)) diff --git a/policyhandler/pdp_api/policy_utils.py b/policyhandler/pdp_api/policy_utils.py index 1d06d14..f2ed522 100644 --- a/policyhandler/pdp_api/policy_utils.py +++ b/policyhandler/pdp_api/policy_utils.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,14 +30,19 @@ class PolicyUtils(object): """policy-client utils""" @staticmethod - def gen_req_to_pdp(policy_id): - """request to get a single policy from pdp by policy_id""" + def gen_req_to_pdp(policy_ids): + """request to get policies from pdp by policy_id list or a single value""" + if not policy_ids: + policy_ids = [] + elif not isinstance(policy_ids, list): + policy_ids = [policy_ids] + return { PDP_REQ_ONAP_NAME: "DCAE", PDP_REQ_ONAP_COMPONENT: Audit.service_name, PDP_REQ_ONAP_INSTANCE: Audit.SERVICE_INSTANCE_UUID, "action": "configure", - PDP_REQ_RESOURCE: {PDP_POLICY_ID: [policy_id]} + PDP_REQ_RESOURCE: {PDP_POLICY_ID: policy_ids} } @staticmethod @@ -52,7 +57,7 @@ class PolicyUtils(object): "version": "1.0.0", "metadata": { "policy-id": "onap.scaleout.tca", - "policy-version": 1, + "policy-version": "1.2.3", "description": "The scaleout policy for vDNS" }, "properties": { @@ -73,13 +78,13 @@ class PolicyUtils(object): { "policy_id": "onap.scaleout.tca", "policy_body": { - "policyName": "onap.scaleout.tca.1.xml", - "policyVersion": 1, + "policyName": "onap.scaleout.tca.1-2-3.xml", + "policyVersion": "1.2.3", "type": "onap.policies.monitoring.cdap.tca.hi.lo.app", "version": "1.0.0", "metadata": { "policy-id": "onap.scaleout.tca", - "policy-version": 1, + "policy-version": "1.2.3", "description": "The scaleout policy for vDNS" }, "config": { @@ -97,19 +102,20 @@ class PolicyUtils(object): } } """ - if not policy_body or not policy_body.get(PDP_PROPERTIES): + if not policy_body: return None pdp_metadata = policy_body.get(PDP_METADATA, {}) policy_id = pdp_metadata.get(PDP_POLICY_ID) policy_version = pdp_metadata.get(PDP_POLICY_VERSION) - if not policy_id or not policy_version: + if not policy_id or policy_version is None: return None - policy_body[POLICY_NAME] = "{}.{}.xml".format(policy_id, policy_version) + policy_body[POLICY_NAME] = "{}.{}.xml".format(policy_id, policy_version.replace(".", "-")) policy_body[POLICY_VERSION] = str(policy_version) - policy_body[POLICY_CONFIG] = policy_body[PDP_PROPERTIES] - del policy_body[PDP_PROPERTIES] + if PDP_PROPERTIES in policy_body: + policy_body[POLICY_CONFIG] = policy_body[PDP_PROPERTIES] + del policy_body[PDP_PROPERTIES] return {POLICY_ID:policy_id, POLICY_BODY:policy_body} diff --git a/policyhandler/pdp_api_v0/policy_listener.py b/policyhandler/pdp_api_v0/policy_listener.py index 67e4c49..7525e4d 100644 --- a/policyhandler/pdp_api_v0/policy_listener.py +++ b/policyhandler/pdp_api_v0/policy_listener.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ # """ -policy-listener communicates with policy-engine +policy_listener communicates with policy-engine thru web-socket to receive push notifications on updates and removal of policies. @@ -298,8 +298,8 @@ class PolicyListener(Thread): def shutdown(self, audit): - """Shutdown the policy-listener""" - _LOGGER.info(audit.info("shutdown policy-listener")) + """Shutdown the policy_listener""" + _LOGGER.info(audit.info("shutdown policy_listener")) with self._lock: self._keep_running = False diff --git a/policyhandler/pdp_api_v0/policy_matcher.py b/policyhandler/pdp_api_v0/policy_matcher.py index 357af49..deb6619 100644 --- a/policyhandler/pdp_api_v0/policy_matcher.py +++ b/policyhandler/pdp_api_v0/policy_matcher.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -37,24 +37,6 @@ class PolicyMatcher(object): PENDING_UPDATE = "pending_update" PDP_API_FOLDER = os.path.basename(os.path.dirname(os.path.realpath(__file__))) - @staticmethod - def get_deployed_policies(audit): - """get the deployed policies and policy-filters""" - deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit) - - if audit.is_not_found(): - warning_txt = "got no deployed policies or policy-filters" - _LOGGER.warning(warning_txt) - return {"warning": warning_txt}, None, None - - if not audit.is_success() or (not deployed_policies and not deployed_policy_filters): - error_txt = "failed to retrieve policies from deployment-handler" - _LOGGER.error(error_txt) - return {"error": error_txt}, None, None - - return None, deployed_policies, deployed_policy_filters - - @staticmethod def build_catch_up_message(audit, deployed_policies, deployed_policy_filters): """ @@ -135,7 +117,7 @@ class PolicyMatcher(object): @staticmethod def match_to_deployed_policies(audit, policies_updated, policies_removed): """match the policies_updated, policies_removed versus deployed policies""" - deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit) + _, deployed_policies, deployed_policy_filters = DeployHandler.get_deployed_policies(audit) if not audit.is_success(): return {}, {}, {} diff --git a/policyhandler/pdp_api_v0/policy_rest.py b/policyhandler/pdp_api_v0/policy_rest.py index c59625e..30fc043 100644 --- a/policyhandler/pdp_api_v0/policy_rest.py +++ b/policyhandler/pdp_api_v0/policy_rest.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,8 +28,7 @@ from threading import Lock import requests from ..config import Config, Settings -from ..onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, - AuditResponseCode, Metrics) +from ..onap.audit import AuditHttpCode, AuditResponseCode, Metrics from ..policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY, POLICY_FILTER, POLICY_FILTERS, POLICY_ID, POLICY_NAMES) @@ -84,10 +83,10 @@ class PolicyRest(object): changed, pool_size = PolicyRest._settings.get_by_key(Config.POOL_CONNECTIONS, 20) if changed: PolicyRest._requests_session.mount( - 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + 'https://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) PolicyRest._requests_session.mount( - 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + 'http://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) @@ -159,7 +158,7 @@ class PolicyRest(object): _LOGGER.error( audit.error("no url for PDP", error_code=AuditResponseCode.AVAILABILITY_ERROR)) audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) - return None + return None, None with PolicyRest._lock: session = PolicyRest._requests_session @@ -171,7 +170,7 @@ class PolicyRest(object): metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url) - headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id + headers = metrics.put_request_id_into_headers(headers) log_action = "post to {} at {}".format(target_entity, url) log_data = "msg={} headers={}, custom_kwargs({}) timeout_in_secs({})".format( @@ -191,7 +190,7 @@ class PolicyRest(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = ("failed {}: {} to {}".format(type(ex).__name__, str(ex), log_line)) - _LOGGER.exception(error_msg) + _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) @@ -412,7 +411,7 @@ class PolicyRest(object): policies_to_find = {} for (policy_id, policy_version) in policies_updated: - if not policy_id or not policy_version or not policy_version.isdigit(): + if not policy_id or policy_version is None or not policy_version.isdigit(): continue policy = policies_to_find.get(policy_id) if not policy: diff --git a/policyhandler/pdp_api_v0/policy_updates.py b/policyhandler/pdp_api_v0/policy_updates.py index eafdca2..ac68f4a 100644 --- a/policyhandler/pdp_api_v0/policy_updates.py +++ b/policyhandler/pdp_api_v0/policy_updates.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ class PolicyUpdates(object): self._policies_removed = {} def reset(self): - """resets the state""" + """resets the state - removes the pending policy-updates""" self.__init__() def pop_policy_updates(self): @@ -62,25 +62,12 @@ class PolicyUpdates(object): def push_policy_updates(self, policies_updated, policies_removed): """consolidate the new policies_updated, policies_removed to existing ones""" - for policy_body in policies_updated: - policy_name = policy_body.get(POLICY_NAME) - policy = PolicyUtils.convert_to_policy(policy_body) - if not policy: - continue - policy_id = policy.get(POLICY_ID) - - self._policies_updated[policy_id] = policy - - rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES) - if rm_policy_names and policy_name in rm_policy_names: - del rm_policy_names[policy_name] - for policy_body in policies_removed: - policy_name = policy_body.get(POLICY_NAME) policy = PolicyUtils.convert_to_policy(policy_body) if not policy: continue policy_id = policy.get(POLICY_ID) + policy_name = policy_body.get(POLICY_NAME) if policy_id in self._policies_removed: policy = self._policies_removed[policy_id] @@ -90,16 +77,27 @@ class PolicyUpdates(object): policy[POLICY_NAMES][policy_name] = True self._policies_removed[policy_id] = policy + for policy_body in policies_updated: + policy = PolicyUtils.convert_to_policy(policy_body) + if not policy: + continue + policy_id = policy.get(POLICY_ID) + policy_name = policy_body.get(POLICY_NAME) + + self._policies_updated[policy_id] = policy + + rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES) + if rm_policy_names and policy_name in rm_policy_names: + del rm_policy_names[policy_name] + req_message = ("policy-update notification - updated[{0}], removed[{1}]" .format(len(self._policies_updated), len(self._policies_removed))) if not self._audit: - self._audit = Audit(job_name="policy_update", - req_message=req_message, + self._audit = Audit(job_name="policy_update", req_message=req_message, retry_get_config=True) - else: - self._audit.req_message = req_message + self._audit.req_message = req_message _LOGGER.info( "pending(%s) for %s policies_updated %s policies_removed %s", diff --git a/policyhandler/pdp_api_v0/policy_utils.py b/policyhandler/pdp_api_v0/policy_utils.py index d337665..2cbb22c 100644 --- a/policyhandler/pdp_api_v0/policy_utils.py +++ b/policyhandler/pdp_api_v0/policy_utils.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ class PolicyUtils(object): return None policy_name = policy_body.get(POLICY_NAME) policy_version = policy_body.get(POLICY_VERSION) - if not policy_name or not policy_version: + if not policy_name or policy_version is None: return None policy_id = PolicyUtils.extract_policy_id(policy_name) if not policy_id: @@ -81,7 +81,7 @@ class PolicyUtils(object): for policy_body in policy_bodies: policy_name = policy_body.get(POLICY_NAME) policy_version = policy_body.get(POLICY_VERSION) - if not policy_name or not policy_version or not policy_version.isdigit(): + if not policy_name or policy_version is None or not policy_version.isdigit(): continue if expected_versions and policy_version not in expected_versions: continue @@ -108,7 +108,7 @@ class PolicyUtils(object): continue policy_id = policy.get(POLICY_ID) policy_version = policy.get(POLICY_BODY, {}).get(POLICY_VERSION) - if not policy_id or not policy_version or not policy_version.isdigit(): + if not policy_id or policy_version is None or not policy_version.isdigit(): continue if (policy_id not in policies or int(policy_version) > int(policies[policy_id][POLICY_BODY][POLICY_VERSION])): diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index d949c4b..091c2c3 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,13 +24,15 @@ on receiving the policy-notifications, the policy-receiver passes the notifications to policy-updater """ +from .config import Config from .service_activator import ServiceActivator + class PolicyReceiver(object): """ policy-receiver - static singleton wrapper around two threads policy_updater - master thread for all scheduled actions - policy_listener - listens to policy-engine through web-socket + policy_listener - listens to policy-engine through DMaaP MR or web-socket """ _policy_updater = None _policy_listener = None @@ -47,9 +49,9 @@ class PolicyReceiver(object): def _close_listener(audit): """stop the notification-handler""" if PolicyReceiver._policy_listener: - policy_receiver = PolicyReceiver._policy_listener + policy_listener = PolicyReceiver._policy_listener PolicyReceiver._policy_listener = None - policy_receiver.shutdown(audit) + policy_listener.shutdown(audit) @staticmethod def shutdown(audit): @@ -98,4 +100,7 @@ class PolicyReceiver(object): PolicyReceiver._policy_updater.start() - PolicyReceiver.catch_up(audit) + if Config.is_pdp_api_default(): + audit.audit_done(result="will catch_up after draining the policy-update queue") + else: + PolicyReceiver.catch_up(audit) diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 3fcde40..42c5ee6 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ class PolicyUpdater(Thread): self._reconfigure_receiver = on_reconfigure_receiver self._lock = Lock() - self._run = Event() + self._run_event = Event() self._settings = Settings(CATCH_UP, Config.RECONFIGURE) self._catch_up_timer = None @@ -74,11 +74,11 @@ class PolicyUpdater(Thread): self._settings.commit_change() return True - def policy_update(self, policies_updated, policies_removed): + def policy_update(self, *args): """enqueue the policy-updates""" with self._lock: - self._policy_updates.push_policy_updates(policies_updated, policies_removed) - self._run.set() + self._policy_updates.push_policy_updates(*args) + self._run_event.set() def catch_up(self, audit=None): """need to bring the latest policies to DCAE-Controller""" @@ -89,7 +89,7 @@ class PolicyUpdater(Thread): "catch_up %s request_id %s", self._aud_catch_up.req_message, self._aud_catch_up.request_id ) - self._run.set() + self._run_event.set() def reconfigure(self, audit=None): """job to check for and bring in the updated config for policy-handler""" @@ -100,7 +100,7 @@ class PolicyUpdater(Thread): "%s request_id %s", self._aud_reconfigure.req_message, self._aud_reconfigure.request_id ) - self._run.set() + self._run_event.set() def run(self): """wait and run the policy-update in thread""" @@ -108,10 +108,10 @@ class PolicyUpdater(Thread): self._run_reconfigure_timer() while True: _LOGGER.info("waiting for policy-updates...") - self._run.wait() + self._run_event.wait() with self._lock: - self._run.clear() + self._run_event.clear() if not self._keep_running(): break @@ -126,7 +126,7 @@ class PolicyUpdater(Thread): self._on_policy_update() - _LOGGER.info("exit policy-updater") + _LOGGER.info("exit policy_updater") def _keep_running(self): """thread-safe check whether to continue running""" @@ -235,7 +235,7 @@ class PolicyUpdater(Thread): if self._reconfigure_receiver(aud_reconfigure): need_to_catch_up = True - changed_configs.append("web-socket") + changed_configs.append("policy-receiver") reconfigure_result = " -- config changed on {} changes: {}".format( json.dumps(changed_configs), Config.discovered_config) @@ -246,7 +246,7 @@ class PolicyUpdater(Thread): Config.discovered_config.commit_change() aud_reconfigure.audit_done(result=reconfigure_result) - _LOGGER.info(log_line + reconfigure_result) + _LOGGER.info("%s%s", log_line, reconfigure_result) if need_to_catch_up: self._pause_catch_up_timer() @@ -300,8 +300,7 @@ class PolicyUpdater(Thread): _LOGGER.info(log_line) self._pause_catch_up_timer() - (_, policies, - policy_filters) = pdp_client.PolicyMatcher.get_deployed_policies(aud_catch_up) + (_, policies, policy_filters) = DeployHandler.get_deployed_policies(aud_catch_up) catch_up_message = None if aud_catch_up.is_not_found(): @@ -422,11 +421,11 @@ class PolicyUpdater(Thread): def shutdown(self, audit): - """Shutdown the policy-updater""" - _LOGGER.info("shutdown policy-updater") + """Shutdown the policy_updater""" + _LOGGER.info("shutdown policy_updater") with self._lock: self._aud_shutdown = audit - self._run.set() + self._run_event.set() self._stop_timers() diff --git a/policyhandler/service_activator.py b/policyhandler/service_activator.py index c1e5b8c..1fab5c6 100644 --- a/policyhandler/service_activator.py +++ b/policyhandler/service_activator.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,8 +36,7 @@ import requests from .config import Config, Settings from .discovery import DiscoveryClient -from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode, - Metrics) +from .onap.audit import Audit, AuditHttpCode, Metrics from .policy_consts import TARGET_ENTITY from .utils import Utils @@ -153,16 +152,11 @@ class ServiceActivator(object): mode_of_operation - whether the service is active == True or passive == False based on the current value of the mode_of_operation - - temporary for R4 Dublin - passive for new PDP API """ active = (ServiceActivator._mode_of_operation is None or ServiceActivator._mode_of_operation == ServiceActivator.MODE_OF_OPERATION_ACTIVE) - if active and Config.is_pdp_api_default(): - active = False - if audit: _LOGGER.info(audit.info("mode_of_operation = {} active = {}".format( ServiceActivator._mode_of_operation, active))) @@ -188,7 +182,7 @@ class ServiceActivator(object): metrics = Metrics(aud_parent=audit, targetEntity="{} determine_mode_of_operation".format(target_entity), targetServiceName=url) - headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} + headers = metrics.put_request_id_into_headers() log_action = "post to {} at {}".format(target_entity, url) log_data = "headers={}, json_body={}, timeout_in_secs={}, custom_kwargs({})".format( @@ -209,7 +203,7 @@ class ServiceActivator(object): else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = "failed to {} {}: {} {}".format( log_action, type(ex).__name__, str(ex), log_data) - _LOGGER.exception(error_msg) + _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) diff --git a/policyhandler/utils.py b/policyhandler/utils.py index d728e48..685d7d8 100644 --- a/policyhandler/utils.py +++ b/policyhandler/utils.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,11 +23,6 @@ import os from copy import deepcopy from typing import Pattern -class ToBeImplementedException(Exception): - """exception for to be implemented features of policy-handler""" - pass - - class Utils(object): """general purpose utils""" _logger = logging.getLogger("policy_handler.utils") @@ -91,6 +86,14 @@ class Utils(object): return False for key, val_1 in body_1.items(): + val_2 = body_2[key] + if isinstance(val_1, str) or isinstance(val_2, str): + if val_1 != val_2: + Utils._logger.debug("key-values %s != %s", + json_dumps({key: val_1}), json_dumps({key: val_2})) + return False + continue + if not Utils.are_the_same(val_1, body_2[key], json_dumps): return False return True diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py index 9c2656e..f52f18e 100644 --- a/policyhandler/web_server.py +++ b/policyhandler/web_server.py @@ -1,5 +1,5 @@ # ================================================================================ -# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,15 +18,15 @@ """web-server for policy_handler""" import json -from datetime import datetime import os import time +from datetime import datetime import cherrypy from . import pdp_client from .config import Config -from .deploy_handler import PolicyUpdateMessage +from .deploy_handler import DeployHandler, PolicyUpdateMessage from .onap.audit import Audit, AuditHttpCode from .policy_receiver import PolicyReceiver from .utils import Utils @@ -120,7 +120,7 @@ class _PolicyWeb(object): PolicyWeb.logger.info("%s", req_info) - result, policies, policy_filters = pdp_client.PolicyMatcher.get_deployed_policies(audit) + result, policies, policy_filters = DeployHandler.get_deployed_policies(audit) if not result: result, policy_update = pdp_client.PolicyMatcher.build_catch_up_message( audit, policies, policy_filters) @@ -184,12 +184,12 @@ class _PolicyWeb(object): } } """ - if Config.is_pdp_api_default(): - raise cherrypy.HTTPError(404, "temporarily unsupported due to the new pdp API") - if cherrypy.request.method == "GET": return self._get_all_policies_latest() + if Config.is_pdp_api_default(): + raise cherrypy.HTTPError(404, "temporarily unsupported due to the new pdp API") + if cherrypy.request.method != "POST": raise cherrypy.HTTPError(404, "unexpected method {0}".format(cherrypy.request.method)) -- cgit 1.2.3-korg