From 6556fd79eb177d8ed7c390d56410b42afb4a0c70 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Fri, 14 Sep 2018 16:54:05 -0400 Subject: 4.3.0 policy-handler - tls to policy-engine - tls to policy-engine - tls on web-socket to policy-engine - tls to deployment-handler - no tls on the web-server side = that is internal API = will add TLS in R4 - policy-handler expecting the deployment process to mount certs at /opt/app/policy_handler/etc/tls/certs/ - blueprint for policy-handler will be updated to contain cert_directory : /opt/app/policy_handler/etc/tls/certs/ - the matching local etc/config.json has new part tls with: = cert_directory : etc/tls/certs/ = cacert : cacert.pem - new optional fields tls_ca_mode in config on consul that specify where to find the cacert.pem for tls per each https/web-socket values are: "cert_directory" - use the cacert.pem stored locally in cert_directory this is the default if cacert.pem file is found "os_ca_bundle" - use the public ca_bundle provided by linux system. this is the default if cacert.pem file not found "do_not_verify" - special hack to turn off the verification by cacert and hostname - config on consul now has 2 new fields for policy_engine = "tls_ca_mode" : "cert_directory" = "tls_wss_ca_mode" : "cert_directory" - config on consul now has 1 new field for deploy_handler = "tls_ca_mode" : "cert_directory" - removed customization for verify -- it is now a built-in feature Change-Id: Ibe9120504ed6036d1ed4c84ff4cd8ad1d9e80f17 Signed-off-by: Alex Shatov Issue-ID: DCAEGEN2-611 --- README.md | 11 --- etc/config.json | 6 +- policyhandler/config.py | 101 +++++++++++++++++++++++++++- policyhandler/customize/customizer_base.py | 8 --- policyhandler/deploy_handler.py | 104 +++++++++++++++-------------- policyhandler/policy_receiver.py | 55 ++++++++++++--- policyhandler/policy_rest.py | 47 +++++++------ policyhandler/policy_updater.py | 2 +- pom.xml | 2 +- run_policy.sh | 2 +- setup.py | 2 +- tests/mock_config.json | 9 ++- tests/test_policyhandler.py | 16 +++-- version.properties | 2 +- 14 files changed, 252 insertions(+), 115 deletions(-) diff --git a/README.md b/README.md index 75ff79a..15c9002 100644 --- a/README.md +++ b/README.md @@ -119,17 +119,6 @@ class Customizer(CustomizerBase): service_url = super().get_service_url(audit, service_name, service) audit.info("TODO: customization for service_url on {0}".format(service_name)) return service_url - - def get_deploy_handler_kwargs(self, audit): - """ - returns the optional dict-kwargs for requests.post to deploy-handler - - this is just a sample code - replace it with the real customization - """ - kwargs = {"verify": "/usr/local/share/ca-certificates/aafcacert.crt"} - audit.info("kwargs for requests.post to deploy-handler: {0}".format(json.dumps(kwargs))) - return kwargs - ``` ---------- diff --git a/etc/config.json b/etc/config.json index e54569b..aa24419 100644 --- a/etc/config.json +++ b/etc/config.json @@ -1,7 +1,11 @@ { "wservice_port" : 25577, "policy_handler" : { - "system" : "policy_handler" + "system" : "policy_handler", + "tls" : { + "cert_directory" : "etc/tls/certs/", + "cacert" : "cacert.pem" + } }, "logging" : { "version": 1, diff --git a/policyhandler/config.py b/policyhandler/config.py index 3d68235..a69954f 100644 --- a/policyhandler/config.py +++ b/policyhandler/config.py @@ -56,6 +56,12 @@ class Settings(object): def __str__(self): """get str of the config""" + if not self._changed: + return Audit.json_dumps({ + "config_keys": self._config_keys, + "config": self._config + }) + return Audit.json_dumps({ "config_keys": self._config_keys, "changed": self._changed, @@ -129,6 +135,7 @@ class Config(object): FIELD_SYSTEM = "system" FIELD_WSERVICE_PORT = "wservice_port" + FIELD_TLS = "tls" FIELD_POLICY_ENGINE = "policy_engine" POOL_CONNECTIONS = "pool_connections" DEPLOY_HANDLER = "deploy_handler" @@ -137,12 +144,73 @@ class Config(object): POLICY_RETRY_SLEEP = "policy_retry_sleep" RECONFIGURE = "reconfigure" TIMER_INTERVAL = "interval" + REQUESTS_VERIFY = "verify" + TLS_CA_MODE = "tls_ca_mode" + TLS_WSS_CA_MODE = "tls_wss_ca_mode" + TLS_CA_MODE_DO_NOT_VERIFY = "do_not_verify" system_name = SERVICE_NAME_POLICY_HANDLER wservice_port = 25577 + tls_cacert_file = None + tls_server_cert_file = None + tls_private_key_file = None + _local_config = Settings() discovered_config = Settings() + @staticmethod + def _set_tls_config(tls_config): + """verify and set tls certs in config""" + try: + Config.tls_cacert_file = None + Config.tls_server_cert_file = None + Config.tls_private_key_file = None + + if not (tls_config and isinstance(tls_config, dict)): + Config._logger.info("no tls in config: %s", json.dumps(tls_config)) + return + + cert_directory = tls_config.get("cert_directory") + + if not (cert_directory and isinstance(cert_directory, str)): + Config._logger.info("unexpected tls.cert_directory: %r", cert_directory) + return + + cert_directory = os.path.join( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))), cert_directory) + if not (cert_directory and os.path.isdir(cert_directory)): + Config._logger.info("ignoring invalid cert_directory: %s", cert_directory) + return + + cacert = tls_config.get("cacert") + if cacert: + tls_cacert_file = os.path.join(cert_directory, cacert) + if not os.path.isfile(tls_cacert_file): + Config._logger.error("invalid tls_cacert_file: %s", tls_cacert_file) + else: + Config.tls_cacert_file = tls_cacert_file + + server_cert = tls_config.get("server_cert") + if server_cert: + tls_server_cert_file = os.path.join(cert_directory, server_cert) + if not os.path.isfile(tls_server_cert_file): + Config._logger.error("invalid tls_server_cert_file: %s", tls_server_cert_file) + else: + Config.tls_server_cert_file = tls_server_cert_file + + private_key = tls_config.get("private_key") + if private_key: + tls_private_key_file = os.path.join(cert_directory, private_key) + if not os.path.isfile(tls_private_key_file): + Config._logger.error("invalid tls_private_key_file: %s", tls_private_key_file) + else: + Config.tls_private_key_file = tls_private_key_file + + finally: + Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file) + Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file) + Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file) + @staticmethod def init_config(file_path=None): """read and store the config from config file""" @@ -169,9 +237,11 @@ class Config(object): Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) - local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER) + local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {}) Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name) + Config._set_tls_config(local_config.get(Config.FIELD_TLS)) + Config._local_config.set_config(local_config, auto_commit=True) Config._logger.info("config loaded from file(%s): %s", file_path, Config._local_config) @@ -190,3 +260,32 @@ class Config(object): Config.discovered_config.set_config(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)) Config._logger.info("config from discovery: %s", Config.discovered_config) + + + @staticmethod + def get_tls_verify(tls_ca_mode=None): + """ + generate verify value based on tls_ca_mode + + tls_ca_mode can be one of: + + "cert_directory" - use the cacert.pem stored locally in cert_directory. + this is the default if cacert.pem file is found + + "os_ca_bundle" - use the public ca_bundle provided by linux system. + this is the default if cacert.pem file not found + + "do_not_verify" - special hack to turn off the verification by cacert and hostname + """ + if tls_ca_mode == Config.TLS_CA_MODE_DO_NOT_VERIFY: + return False + + if tls_ca_mode == "os_ca_bundle" or not Config.tls_cacert_file: + return True + + return Config.tls_cacert_file + + @staticmethod + def get_requests_kwargs(tls_ca_mode=None): + """generate kwargs with verify for requests based on the tls_ca_mode""" + return {Config.REQUESTS_VERIFY: Config.get_tls_verify(tls_ca_mode)} diff --git a/policyhandler/customize/customizer_base.py b/policyhandler/customize/customizer_base.py index 561891f..33b8c7d 100644 --- a/policyhandler/customize/customizer_base.py +++ b/policyhandler/customize/customizer_base.py @@ -53,11 +53,3 @@ class CustomizerBase(object): self._logger.info(info) audit.info(info) return service_url - - def get_deploy_handler_kwargs(self, audit): - """returns the optional dict-kwargs for requests.put to deploy_handler""" - info = "no optional kwargs for requests.put to deploy_handler" - self._logger.info(info) - audit.info(info) - kwargs = {} - return kwargs diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index e308c1a..b4b2468 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -26,7 +26,6 @@ from threading import Lock import requests from .config import Config, Settings -from .customize import CustomizerUser from .discovery import DiscoveryClient from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, AuditResponseCode, Metrics) @@ -157,18 +156,14 @@ class DeployHandler(object): _max_msg_length_mb = 10 _query = {} _target_entity = None - _custom_kwargs = None + _custom_kwargs = {} _server_instance_uuid = None server_instance_changed = False @staticmethod def _init(audit): """set config""" - DeployHandler._custom_kwargs = (CustomizerUser.get_customizer() - .get_deploy_handler_kwargs(audit)) - if (not DeployHandler._custom_kwargs - or not isinstance(DeployHandler._custom_kwargs, dict)): - DeployHandler._custom_kwargs = {} + DeployHandler._custom_kwargs = {} if not DeployHandler._requests_session: DeployHandler._requests_session = requests.Session() @@ -188,11 +183,12 @@ class DeployHandler(object): # config for policy-handler >= 2.4.0 # "deploy_handler" : { # "target_entity" : "deployment_handler", - # "url" : "http://deployment_handler:8188", + # "url" : "https://deployment_handler:8188", # "max_msg_length_mb" : 10, # "query" : { # "cfy_tenant_name" : "default_tenant" - # } + # }, + # "tls_ca_mode" : "cert_directory" # } DeployHandler._target_entity = config_dh.get(TARGET_ENTITY, DeployHandler.DEFAULT_TARGET_ENTITY) @@ -200,8 +196,13 @@ class DeployHandler(object): DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb", DeployHandler._max_msg_length_mb) DeployHandler._query = deepcopy(config_dh.get("query", {})) - DeployHandler._logger.info("dns based routing to %s: url(%s)", - DeployHandler._target_entity, DeployHandler._url) + tls_ca_mode = config_dh.get(Config.TLS_CA_MODE) + DeployHandler._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) + + DeployHandler._logger.info( + "dns based routing to %s: url(%s) tls_ca_mode(%s) custom_kwargs(%s)", + DeployHandler._target_entity, DeployHandler._url, + tls_ca_mode, json.dumps(DeployHandler._custom_kwargs)) if not DeployHandler._url: # discover routing to deployment-handler at consul-services @@ -258,11 +259,11 @@ class DeployHandler(object): DeployHandler._lazy_init(audit) - str_metrics = "policy_update {0}".format(str(policy_update_message)) + str_metrics = "policy_update {}".format(str(policy_update_message)) metrics_total = Metrics( aud_parent=audit, - targetEntity="{0} total policy_update".format(DeployHandler._target_entity), + targetEntity="{} total policy_update".format(DeployHandler._target_entity), targetServiceName=DeployHandler._url_policy) metrics_total.metrics_start("started {}".format(str_metrics)) @@ -284,15 +285,23 @@ class DeployHandler(object): if not message: return - metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, - targetServiceName=DeployHandler._url_policy) + with DeployHandler._lock: + session = DeployHandler._requests_session + target_entity = DeployHandler._target_entity + url = DeployHandler._url_policy + params = deepcopy(DeployHandler._query) + custom_kwargs = deepcopy(DeployHandler._custom_kwargs) + + metrics = Metrics(aud_parent=audit, targetEntity="{} policy_update".format(target_entity), + targetServiceName=url) headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} - log_action = "put to {0} at {1}".format( - DeployHandler._target_entity, DeployHandler._url_policy) - log_data = " msg={} headers={}, params={}".format(json.dumps(message), json.dumps(headers), - json.dumps(DeployHandler._query)) - log_line = log_action + log_data + log_action = "put to {} at {}".format(target_entity, url) + log_data = "msg={} headers={}, params={} custom_kwargs({})".format( + json.dumps(message), json.dumps(headers), + json.dumps(params), json.dumps(custom_kwargs)) + log_line = log_action + " " + log_data + DeployHandler._logger.info(log_line) metrics.metrics_start(log_line) @@ -306,20 +315,13 @@ class DeployHandler(object): res = None try: - with DeployHandler._lock: - session = DeployHandler._requests_session - url = DeployHandler._url_policy - params = deepcopy(DeployHandler._query) - custom_kwargs = deepcopy(DeployHandler._custom_kwargs) - - res = session.put(url, json=message, - headers=headers, params=params, **custom_kwargs) + res = session.put(url, json=message, headers=headers, params=params, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) else AuditHttpCode.SERVER_INTERNAL_ERROR.value) - error_msg = ("failed to {0} {1}: {2}{3}" - .format(log_action, type(ex).__name__, str(ex), log_data)) + error_msg = "failed to {} {}: {} {}".format( + log_action, type(ex).__name__, str(ex), log_data) DeployHandler._logger.exception(error_msg) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) @@ -329,8 +331,8 @@ class DeployHandler(object): metrics.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) - log_line = "response {0} from {1}: text={2}{3}".format(res.status_code, log_action, - res.text, log_data) + log_line = "response {} from {}: text={} {}".format( + res.status_code, log_action, res.text, log_data) metrics.metrics(log_line) if res.status_code != requests.codes.ok: @@ -349,19 +351,29 @@ class DeployHandler(object): that were deployed by deployment-handler """ DeployHandler._lazy_init(audit) - metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, - targetServiceName=DeployHandler._url_policy) + + with DeployHandler._lock: + session = DeployHandler._requests_session + target_entity = DeployHandler._target_entity + url = DeployHandler._url_policy + params = deepcopy(DeployHandler._query) + custom_kwargs = deepcopy(DeployHandler._custom_kwargs) + + metrics = Metrics(aud_parent=audit, + targetEntity="{} get_deployed_policies".format(target_entity), + targetServiceName=url) headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} - log_action = "get {0}: {1}".format(DeployHandler._target_entity, DeployHandler._url_policy) - log_data = " headers={}, params={}".format(json.dumps(headers), - json.dumps(DeployHandler._query)) - log_line = log_action + log_data + log_action = "get from {} at {}".format(target_entity, url) + log_data = "headers={}, params={} custom_kwargs({})".format( + json.dumps(headers), json.dumps(params), json.dumps(custom_kwargs)) + log_line = log_action + " " + log_data + DeployHandler._logger.info(log_line) metrics.metrics_start(log_line) if not DeployHandler._url: - error_msg = "no url found to {0}".format(log_line) + error_msg = "no url found to {}".format(log_line) DeployHandler._logger.error(error_msg) metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) @@ -370,19 +382,13 @@ class DeployHandler(object): res = None try: - with DeployHandler._lock: - session = DeployHandler._requests_session - url = DeployHandler._url_policy - params = deepcopy(DeployHandler._query) - custom_kwargs = deepcopy(DeployHandler._custom_kwargs) - res = session.get(url, headers=headers, params=params, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) else AuditHttpCode.SERVER_INTERNAL_ERROR.value) - error_msg = ("failed to {0} {1}: {2}{3}" - .format(log_action, type(ex).__name__, str(ex), log_data)) + error_msg = "failed to {} {}: {} {}".format( + log_action, type(ex).__name__, str(ex), log_data) DeployHandler._logger.exception(error_msg) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) @@ -392,8 +398,8 @@ class DeployHandler(object): metrics.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) - log_line = ("response {0} from {1}: text={2}{3}" - .format(res.status_code, log_action, res.text, log_data)) + log_line = "response {} from {}: text={} {}".format( + res.status_code, log_action, res.text, log_data) metrics.metrics(log_line) if res.status_code != requests.codes.ok: diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py index 1edb24d..96afd59 100644 --- a/policyhandler/policy_receiver.py +++ b/policyhandler/policy_receiver.py @@ -25,8 +25,11 @@ on receiving the policy-notifications, the policy-receiver passes the notifications to policy-updater """ +import copy import json import logging +import os +import ssl import time from threading import Lock, Thread @@ -35,6 +38,7 @@ import websocket from .config import Config, Settings from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION from .policy_updater import PolicyUpdater +from .policy_utils import Utils LOADED_POLICIES = 'loadedPolicies' REMOVED_POLICIES = 'removedPolicies' @@ -54,6 +58,8 @@ class _PolicyReceiver(Thread): self._settings = Settings(Config.FIELD_POLICY_ENGINE) self._web_socket_url = None + self._web_socket_sslopt = None + self._tls_wss_ca_mode = None self._web_socket = None self.reconfigure() @@ -71,21 +77,39 @@ class _PolicyReceiver(Thread): return False prev_web_socket_url = self._web_socket_url - resturl = config.get("url", "") + config.get("path_pdp", "") + prev_web_socket_sslopt = self._web_socket_sslopt + self._web_socket_sslopt = None + + resturl = (config.get("url", "").lower() + + config.get("path_notifications", "/pdp/notifications")) + + self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE) if resturl.startswith("https:"): - self._web_socket_url = resturl.replace("https:", "wss:") + "notifications" + self._web_socket_url = resturl.replace("https:", "wss:") + + verify = Config.get_tls_verify(self._tls_wss_ca_mode) + if verify is False: + self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE} + elif verify is True: + pass + else: + self._web_socket_sslopt = {'ca_certs': verify} + else: - self._web_socket_url = resturl.replace("http:", "ws:") + "notifications" + self._web_socket_url = resturl.replace("http:", "ws:") - if self._web_socket_url == prev_web_socket_url: - _PolicyReceiver._logger.info("not changed web_socket_url(%s): %s", - self._web_socket_url, self._settings) + if (self._web_socket_url == prev_web_socket_url + and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)): + _PolicyReceiver._logger.info( + "not changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s", + self._web_socket_url, self._tls_wss_ca_mode, self._settings) self._settings.commit_change() return False - _PolicyReceiver._logger.info("changed web_socket_url(%s): %s", - self._web_socket_url, self._settings) + _PolicyReceiver._logger.info("changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s", + self._web_socket_url, self._tls_wss_ca_mode, + self._settings) self._settings.commit_change() self._stop_notifications() @@ -103,18 +127,27 @@ class _PolicyReceiver(Thread): if restarting: time.sleep(5) + if not self._get_keep_running(): + break + + with self._lock: + web_socket_url = self._web_socket_url + sslopt = copy.deepcopy(self._web_socket_sslopt) + tls_wss_ca_mode = self._tls_wss_ca_mode _PolicyReceiver._logger.info( - "connecting to policy-notifications at: %s", self._web_socket_url) + "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)", + web_socket_url, json.dumps(sslopt), tls_wss_ca_mode) + self._web_socket = websocket.WebSocketApp( - self._web_socket_url, + web_socket_url, on_message=self._on_pdp_message, on_close=self._on_ws_close, on_error=self._on_ws_error ) _PolicyReceiver._logger.info("waiting for policy-notifications...") - self._web_socket.run_forever() + self._web_socket.run_forever(sslopt=sslopt) restarting = True _PolicyReceiver._logger.info("exit policy-receiver") diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py index d10f4bf..0713b38 100644 --- a/policyhandler/policy_rest.py +++ b/policyhandler/policy_rest.py @@ -61,6 +61,7 @@ class PolicyRest(object): _url_get_config = None _headers = None _target_entity = None + _custom_kwargs = {} _thread_pool_size = 4 _policy_retry_count = 1 _policy_retry_sleep = 0 @@ -68,6 +69,8 @@ class PolicyRest(object): @staticmethod def _init(): """init static config""" + PolicyRest._custom_kwargs = {} + _, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE) if not PolicyRest._requests_session: @@ -82,9 +85,9 @@ class PolicyRest(object): 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)) - PolicyRest._url_get_config = (config["url"] + config["path_api"] + PolicyRest._url_get_config = (config.get("url", "") + config.get("path_api", "") + PolicyRest.POLICY_GET_CONFIG) - PolicyRest._headers = config["headers"] + PolicyRest._headers = config.get("headers", {}) PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE) _, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key( Config.THREAD_POOL_SIZE, 4) @@ -96,9 +99,13 @@ class PolicyRest(object): _, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key( Config.POLICY_RETRY_SLEEP, 0) - PolicyRest._logger.info("PDP url(%s) headers(%s): %s", - PolicyRest._url_get_config, + tls_ca_mode = config.get(Config.TLS_CA_MODE) + PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) + + PolicyRest._logger.info("PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) custom_kwargs(%s): %s", + PolicyRest._target_entity, PolicyRest._url_get_config, Metrics.json_dumps(PolicyRest._headers), + tls_ca_mode, json.dumps(PolicyRest._custom_kwargs), PolicyRest._settings) PolicyRest._settings.commit_change() @@ -133,25 +140,27 @@ class PolicyRest(object): @staticmethod def _pdp_get_config(audit, json_body): """Communication with the policy-engine""" - metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity, - targetServiceName=PolicyRest._url_get_config) - with PolicyRest._lock: session = PolicyRest._requests_session + target_entity = PolicyRest._target_entity url = PolicyRest._url_get_config headers = copy.deepcopy(PolicyRest._headers) + custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs) + + metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url) headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id - headers_str = Metrics.json_dumps(headers) - msg = json.dumps(json_body) - log_line = "post to PDP {} msg={} headers={}".format(url, msg, headers_str) + log_action = "post to {} at {}".format(target_entity, url) + log_data = "msg={} headers={}, custom_kwargs({})".format( + json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs)) + log_line = log_action + " " + log_data PolicyRest._logger.info(metrics.metrics_start(log_line)) res = None try: - res = session.post(url, json=json_body, headers=headers) + res = session.post(url, json=json_body, headers=headers, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -164,8 +173,8 @@ class PolicyRest(object): metrics.metrics(error_msg) return (error_code, None) - log_line = "response {} from post to PDP {}: msg={} text={} headers={}".format( - res.status_code, url, msg, res.text, + log_line = "response {} from {}: text={} headers={}".format( + res.status_code, log_line, res.text, Metrics.json_dumps(dict(res.request.headers.items()))) status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res) @@ -280,13 +289,13 @@ class PolicyRest(object): break if retry == PolicyRest._policy_retry_count: - audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}" + audit.warn("gave up retrying {} from PDP after #{} for policy_id={}" .format(PolicyRest._url_get_config, retry, policy_id), error_code=AuditResponseCode.DATA_ERROR) break audit.warn( - "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format( + "retry #{} {} from PDP in {} secs for policy_id={}".format( retry, PolicyRest._url_get_config, PolicyRest._policy_retry_sleep, policy_id), error_code=AuditResponseCode.DATA_ERROR) @@ -301,7 +310,7 @@ class PolicyRest(object): if not PolicyRest._validate_policy(latest_policy): audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) audit.error( - "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)), + "received invalid policy from PDP: {}".format(json.dumps(latest_policy)), error_code=AuditResponseCode.DATA_ERROR) return latest_policy @@ -322,7 +331,7 @@ class PolicyRest(object): ) if not latest_policy and not expect_policy_removed: - audit.error("received unexpected policy data from PDP for policy_id={0}: {1}" + audit.error("received unexpected policy data from PDP for policy_id={}: {}" .format(policy_id, json.dumps(policy_bodies or [])), error_code=AuditResponseCode.DATA_ERROR) @@ -436,7 +445,7 @@ class PolicyRest(object): if errored_policies: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) audit.error( - "errored_policies in PDP: {0}".format(json.dumps(errored_policies)), + "errored_policies in PDP: {}".format(json.dumps(errored_policies)), error_code=AuditResponseCode.DATA_ERROR) return updated_policies, removed_policies @@ -460,7 +469,7 @@ class PolicyRest(object): if not latest_policies: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) audit.warn( - "received no policies from PDP for policy_filter {0}: {1}" + "received no policies from PDP for policy_filter {}: {}" .format(str_policy_filter, json.dumps(policy_bodies or [])), error_code=AuditResponseCode.DATA_ERROR) return None, latest_policies diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py index 8909cc7..235e2b6 100644 --- a/policyhandler/policy_updater.py +++ b/policyhandler/policy_updater.py @@ -420,7 +420,7 @@ class PolicyUpdater(Thread): PolicyUpdater._logger.warning(result) elif not updated_policies and not removed_policies: result = "- not sending empty policy-updates to deployment-handler" - PolicyUpdater._logger.warning(result) + PolicyUpdater._logger.info(result) else: message = PolicyUpdateMessage(updated_policies, removed_policies, policy_filter_matches, False) diff --git a/pom.xml b/pom.xml index 54b234c..8b6ca2b 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. org.onap.dcaegen2.platform policy-handler dcaegen2-platform-policy-handler - 4.2.0-SNAPSHOT + 4.3.0-SNAPSHOT http://maven.apache.org UTF-8 diff --git a/run_policy.sh b/run_policy.sh index 1d411b2..3b804ec 100644 --- a/run_policy.sh +++ b/run_policy.sh @@ -26,7 +26,7 @@ STARTED=$(date +%Y-%m-%d_%T.%N) echo "${STARTED}: running ${BASH_SOURCE[0]}" echo "APP_VER =" $(python setup.py --version) echo "HOSTNAME =" ${HOSTNAME} -(uname -a; echo "/etc/hosts"; cat /etc/hosts; pwd) +(uname -a; echo "/etc/hosts"; cat /etc/hosts; pwd; openssl version -a) python -m policyhandler & PID=$! diff --git a/setup.py b/setup.py index c620e2b..2168cd6 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ from setuptools import setup setup( name='policyhandler', description='DCAE-Controller policy-handler to communicate with policy-engine', - version="4.2.0", + version="4.3.0", author='Alex Shatov', packages=['policyhandler'], zip_safe=False, diff --git a/tests/mock_config.json b/tests/mock_config.json index 98b0d19..7ec0ac6 100644 --- a/tests/mock_config.json +++ b/tests/mock_config.json @@ -13,7 +13,7 @@ }, "policy_engine" : { "url" : "https://pdp-server:8081", - "path_pdp" : "/pdp/", + "path_notifications" : "/pdp/notifications", "path_api" : "/pdp/api/", "headers" : { "Accept" : "application/json", @@ -22,7 +22,9 @@ "Authorization" : "Basic auth", "Environment" : "TEST" }, - "target_entity" : "policy_engine" + "target_entity" : "policy_engine", + "tls_ca_mode" : "cert_directory", + "tls_wss_ca_mode" : "cert_directory" }, "deploy_handler" : { "target_entity" : "deployment_handler", @@ -30,7 +32,8 @@ "max_msg_length_mb" : 5, "query" : { "cfy_tenant_name" : "default_tenant" - } + }, + "tls_ca_mode" : "cert_directory" } } } diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py index d14aeea..c501c12 100644 --- a/tests/test_policyhandler.py +++ b/tests/test_policyhandler.py @@ -175,7 +175,7 @@ class MockDeploymentHandler(object): @pytest.fixture() def fix_pdp_post(monkeypatch): """monkeyed request /getConfig to PDP""" - def monkeyed_policy_rest_post(full_path, json=None, headers=None): + def monkeyed_policy_rest_post(full_path, json=None, headers=None, **custom_kwargs): """monkeypatch for the POST to policy-engine""" res_json = MockPolicyEngine.get_config(json.get(POLICY_NAME)) return MonkeyedResponse(full_path, res_json, json, headers) @@ -191,7 +191,7 @@ def fix_pdp_post(monkeypatch): @pytest.fixture() def fix_pdp_post_big(monkeypatch): """monkeyed request /getConfig to PDP""" - def monkeyed_policy_rest_post(full_path, json=None, headers=None): + def monkeyed_policy_rest_post(full_path, json=None, headers=None, **custom_kwargs): """monkeypatch for the POST to policy-engine""" res_json = MockPolicyEngine.get_configs_all() return MonkeyedResponse(full_path, res_json, json, headers) @@ -212,7 +212,7 @@ class MockException(Exception): @pytest.fixture() def fix_pdp_post_boom(monkeypatch): """monkeyed request /getConfig to PDP - exception""" - def monkeyed_policy_rest_post_boom(full_path, json=None, headers=None): + def monkeyed_policy_rest_post_boom(full_path, json=None, headers=None, **custom_kwargs): """monkeypatch for the POST to policy-engine""" raise MockException("fix_pdp_post_boom") @@ -268,12 +268,13 @@ def fix_discovery(monkeypatch): @pytest.fixture() def fix_deploy_handler(monkeypatch): """monkeyed requests to deployment-handler""" - def monkeyed_deploy_handler_put(full_path, json=None, headers=None, params=None): + def monkeyed_deploy_handler_put(full_path, json=None, headers=None, + params=None, **custom_kwargs): """monkeypatch for policy-update request.put to deploy_handler""" return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(), json, headers) - def monkeyed_deploy_handler_get(full_path, headers=None, params=None): + def monkeyed_deploy_handler_get(full_path, headers=None, params=None, **custom_kwargs): """monkeypatch policy-update request.get to deploy_handler""" return MonkeyedResponse(full_path, MockDeploymentHandler.get_deployed_policies(), None, headers) @@ -295,7 +296,8 @@ def fix_deploy_handler(monkeypatch): @pytest.fixture() def fix_deploy_handler_fail(monkeypatch): """monkeyed failed discovery request.get""" - def monkeyed_deploy_handler_put(full_path, json=None, headers=None, params=None): + def monkeyed_deploy_handler_put(full_path, json=None, headers=None, + params=None, **custom_kwargs): """monkeypatch for deploy_handler""" res = MonkeyedResponse( full_path, @@ -305,7 +307,7 @@ def fix_deploy_handler_fail(monkeypatch): res.status_code = 413 return res - def monkeyed_deploy_handler_get(full_path, headers=None, params=None): + def monkeyed_deploy_handler_get(full_path, headers=None, params=None, **custom_kwargs): """monkeypatch policy-update request.get to deploy_handler""" return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(), None, headers) diff --git a/version.properties b/version.properties index 29d7138..e77ddf3 100644 --- a/version.properties +++ b/version.properties @@ -1,5 +1,5 @@ major=4 -minor=2 +minor=3 patch=0 base_version=${major}.${minor}.${patch} release_version=${base_version} -- cgit 1.2.3-korg