aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-09-14 16:54:05 -0400
committerAlex Shatov <alexs@att.com>2018-09-14 16:54:05 -0400
commit6556fd79eb177d8ed7c390d56410b42afb4a0c70 (patch)
treea45f57fbdd4ba1468390868371484d299d23ed8c
parent1d693376205c66af93283d04e8e9740c947a7d02 (diff)
4.3.0 policy-handler - tls to policy-engine
- tls to policy-engine - tls on web-socket to policy-engine - tls to deployment-handler - no tls on the web-server side = that is internal API = will add TLS in R4 - policy-handler expecting the deployment process to mount certs at /opt/app/policy_handler/etc/tls/certs/ - blueprint for policy-handler will be updated to contain cert_directory : /opt/app/policy_handler/etc/tls/certs/ - the matching local etc/config.json has new part tls with: = cert_directory : etc/tls/certs/ = cacert : cacert.pem - new optional fields tls_ca_mode in config on consul that specify where to find the cacert.pem for tls per each https/web-socket values are: "cert_directory" - use the cacert.pem stored locally in cert_directory this is the default if cacert.pem file is found "os_ca_bundle" - use the public ca_bundle provided by linux system. this is the default if cacert.pem file not found "do_not_verify" - special hack to turn off the verification by cacert and hostname - config on consul now has 2 new fields for policy_engine = "tls_ca_mode" : "cert_directory" = "tls_wss_ca_mode" : "cert_directory" - config on consul now has 1 new field for deploy_handler = "tls_ca_mode" : "cert_directory" - removed customization for verify -- it is now a built-in feature Change-Id: Ibe9120504ed6036d1ed4c84ff4cd8ad1d9e80f17 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-611
-rw-r--r--README.md11
-rw-r--r--etc/config.json6
-rw-r--r--policyhandler/config.py101
-rw-r--r--policyhandler/customize/customizer_base.py8
-rw-r--r--policyhandler/deploy_handler.py104
-rw-r--r--policyhandler/policy_receiver.py55
-rw-r--r--policyhandler/policy_rest.py47
-rw-r--r--policyhandler/policy_updater.py2
-rw-r--r--pom.xml2
-rw-r--r--run_policy.sh2
-rw-r--r--setup.py2
-rw-r--r--tests/mock_config.json9
-rw-r--r--tests/test_policyhandler.py16
-rw-r--r--version.properties2
14 files changed, 252 insertions, 115 deletions
diff --git a/README.md b/README.md
index 75ff79a..15c9002 100644
--- a/README.md
+++ b/README.md
@@ -119,17 +119,6 @@ class Customizer(CustomizerBase):
service_url = super().get_service_url(audit, service_name, service)
audit.info("TODO: customization for service_url on {0}".format(service_name))
return service_url
-
- def get_deploy_handler_kwargs(self, audit):
- """
- returns the optional dict-kwargs for requests.post to deploy-handler
-
- this is just a sample code - replace it with the real customization
- """
- kwargs = {"verify": "/usr/local/share/ca-certificates/aafcacert.crt"}
- audit.info("kwargs for requests.post to deploy-handler: {0}".format(json.dumps(kwargs)))
- return kwargs
-
```
----------
diff --git a/etc/config.json b/etc/config.json
index e54569b..aa24419 100644
--- a/etc/config.json
+++ b/etc/config.json
@@ -1,7 +1,11 @@
{
"wservice_port" : 25577,
"policy_handler" : {
- "system" : "policy_handler"
+ "system" : "policy_handler",
+ "tls" : {
+ "cert_directory" : "etc/tls/certs/",
+ "cacert" : "cacert.pem"
+ }
},
"logging" : {
"version": 1,
diff --git a/policyhandler/config.py b/policyhandler/config.py
index 3d68235..a69954f 100644
--- a/policyhandler/config.py
+++ b/policyhandler/config.py
@@ -56,6 +56,12 @@ class Settings(object):
def __str__(self):
"""get str of the config"""
+ if not self._changed:
+ return Audit.json_dumps({
+ "config_keys": self._config_keys,
+ "config": self._config
+ })
+
return Audit.json_dumps({
"config_keys": self._config_keys,
"changed": self._changed,
@@ -129,6 +135,7 @@ class Config(object):
FIELD_SYSTEM = "system"
FIELD_WSERVICE_PORT = "wservice_port"
+ FIELD_TLS = "tls"
FIELD_POLICY_ENGINE = "policy_engine"
POOL_CONNECTIONS = "pool_connections"
DEPLOY_HANDLER = "deploy_handler"
@@ -137,13 +144,74 @@ class Config(object):
POLICY_RETRY_SLEEP = "policy_retry_sleep"
RECONFIGURE = "reconfigure"
TIMER_INTERVAL = "interval"
+ REQUESTS_VERIFY = "verify"
+ TLS_CA_MODE = "tls_ca_mode"
+ TLS_WSS_CA_MODE = "tls_wss_ca_mode"
+ TLS_CA_MODE_DO_NOT_VERIFY = "do_not_verify"
system_name = SERVICE_NAME_POLICY_HANDLER
wservice_port = 25577
+ tls_cacert_file = None
+ tls_server_cert_file = None
+ tls_private_key_file = None
+
_local_config = Settings()
discovered_config = Settings()
@staticmethod
+ def _set_tls_config(tls_config):
+ """verify and set tls certs in config"""
+ try:
+ Config.tls_cacert_file = None
+ Config.tls_server_cert_file = None
+ Config.tls_private_key_file = None
+
+ if not (tls_config and isinstance(tls_config, dict)):
+ Config._logger.info("no tls in config: %s", json.dumps(tls_config))
+ return
+
+ cert_directory = tls_config.get("cert_directory")
+
+ if not (cert_directory and isinstance(cert_directory, str)):
+ Config._logger.info("unexpected tls.cert_directory: %r", cert_directory)
+ return
+
+ cert_directory = os.path.join(
+ os.path.dirname(os.path.dirname(os.path.realpath(__file__))), cert_directory)
+ if not (cert_directory and os.path.isdir(cert_directory)):
+ Config._logger.info("ignoring invalid cert_directory: %s", cert_directory)
+ return
+
+ cacert = tls_config.get("cacert")
+ if cacert:
+ tls_cacert_file = os.path.join(cert_directory, cacert)
+ if not os.path.isfile(tls_cacert_file):
+ Config._logger.error("invalid tls_cacert_file: %s", tls_cacert_file)
+ else:
+ Config.tls_cacert_file = tls_cacert_file
+
+ server_cert = tls_config.get("server_cert")
+ if server_cert:
+ tls_server_cert_file = os.path.join(cert_directory, server_cert)
+ if not os.path.isfile(tls_server_cert_file):
+ Config._logger.error("invalid tls_server_cert_file: %s", tls_server_cert_file)
+ else:
+ Config.tls_server_cert_file = tls_server_cert_file
+
+ private_key = tls_config.get("private_key")
+ if private_key:
+ tls_private_key_file = os.path.join(cert_directory, private_key)
+ if not os.path.isfile(tls_private_key_file):
+ Config._logger.error("invalid tls_private_key_file: %s", tls_private_key_file)
+ else:
+ Config.tls_private_key_file = tls_private_key_file
+
+ finally:
+ Config._logger.info("tls_cacert_file = %s", Config.tls_cacert_file)
+ Config._logger.info("tls_server_cert_file = %s", Config.tls_server_cert_file)
+ Config._logger.info("tls_private_key_file = %s", Config.tls_private_key_file)
+
+ @staticmethod
def init_config(file_path=None):
"""read and store the config from config file"""
if Config._local_config.is_loaded():
@@ -169,9 +237,11 @@ class Config(object):
Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
- local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER)
+ local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {})
Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name)
+ Config._set_tls_config(local_config.get(Config.FIELD_TLS))
+
Config._local_config.set_config(local_config, auto_commit=True)
Config._logger.info("config loaded from file(%s): %s", file_path, Config._local_config)
@@ -190,3 +260,32 @@ class Config(object):
Config.discovered_config.set_config(new_config.get(Config.SERVICE_NAME_POLICY_HANDLER))
Config._logger.info("config from discovery: %s", Config.discovered_config)
+
+
+ @staticmethod
+ def get_tls_verify(tls_ca_mode=None):
+ """
+ generate verify value based on tls_ca_mode
+
+ tls_ca_mode can be one of:
+
+ "cert_directory" - use the cacert.pem stored locally in cert_directory.
+ this is the default if cacert.pem file is found
+
+ "os_ca_bundle" - use the public ca_bundle provided by linux system.
+ this is the default if cacert.pem file not found
+
+ "do_not_verify" - special hack to turn off the verification by cacert and hostname
+ """
+ if tls_ca_mode == Config.TLS_CA_MODE_DO_NOT_VERIFY:
+ return False
+
+ if tls_ca_mode == "os_ca_bundle" or not Config.tls_cacert_file:
+ return True
+
+ return Config.tls_cacert_file
+
+ @staticmethod
+ def get_requests_kwargs(tls_ca_mode=None):
+ """generate kwargs with verify for requests based on the tls_ca_mode"""
+ return {Config.REQUESTS_VERIFY: Config.get_tls_verify(tls_ca_mode)}
diff --git a/policyhandler/customize/customizer_base.py b/policyhandler/customize/customizer_base.py
index 561891f..33b8c7d 100644
--- a/policyhandler/customize/customizer_base.py
+++ b/policyhandler/customize/customizer_base.py
@@ -53,11 +53,3 @@ class CustomizerBase(object):
self._logger.info(info)
audit.info(info)
return service_url
-
- def get_deploy_handler_kwargs(self, audit):
- """returns the optional dict-kwargs for requests.put to deploy_handler"""
- info = "no optional kwargs for requests.put to deploy_handler"
- self._logger.info(info)
- audit.info(info)
- kwargs = {}
- return kwargs
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index e308c1a..b4b2468 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -26,7 +26,6 @@ from threading import Lock
import requests
from .config import Config, Settings
-from .customize import CustomizerUser
from .discovery import DiscoveryClient
from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
AuditResponseCode, Metrics)
@@ -157,18 +156,14 @@ class DeployHandler(object):
_max_msg_length_mb = 10
_query = {}
_target_entity = None
- _custom_kwargs = None
+ _custom_kwargs = {}
_server_instance_uuid = None
server_instance_changed = False
@staticmethod
def _init(audit):
"""set config"""
- DeployHandler._custom_kwargs = (CustomizerUser.get_customizer()
- .get_deploy_handler_kwargs(audit))
- if (not DeployHandler._custom_kwargs
- or not isinstance(DeployHandler._custom_kwargs, dict)):
- DeployHandler._custom_kwargs = {}
+ DeployHandler._custom_kwargs = {}
if not DeployHandler._requests_session:
DeployHandler._requests_session = requests.Session()
@@ -188,11 +183,12 @@ class DeployHandler(object):
# config for policy-handler >= 2.4.0
# "deploy_handler" : {
# "target_entity" : "deployment_handler",
- # "url" : "http://deployment_handler:8188",
+ # "url" : "https://deployment_handler:8188",
# "max_msg_length_mb" : 10,
# "query" : {
# "cfy_tenant_name" : "default_tenant"
- # }
+ # },
+ # "tls_ca_mode" : "cert_directory"
# }
DeployHandler._target_entity = config_dh.get(TARGET_ENTITY,
DeployHandler.DEFAULT_TARGET_ENTITY)
@@ -200,8 +196,13 @@ class DeployHandler(object):
DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb",
DeployHandler._max_msg_length_mb)
DeployHandler._query = deepcopy(config_dh.get("query", {}))
- DeployHandler._logger.info("dns based routing to %s: url(%s)",
- DeployHandler._target_entity, DeployHandler._url)
+ tls_ca_mode = config_dh.get(Config.TLS_CA_MODE)
+ DeployHandler._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
+
+ DeployHandler._logger.info(
+ "dns based routing to %s: url(%s) tls_ca_mode(%s) custom_kwargs(%s)",
+ DeployHandler._target_entity, DeployHandler._url,
+ tls_ca_mode, json.dumps(DeployHandler._custom_kwargs))
if not DeployHandler._url:
# discover routing to deployment-handler at consul-services
@@ -258,11 +259,11 @@ class DeployHandler(object):
DeployHandler._lazy_init(audit)
- str_metrics = "policy_update {0}".format(str(policy_update_message))
+ str_metrics = "policy_update {}".format(str(policy_update_message))
metrics_total = Metrics(
aud_parent=audit,
- targetEntity="{0} total policy_update".format(DeployHandler._target_entity),
+ targetEntity="{} total policy_update".format(DeployHandler._target_entity),
targetServiceName=DeployHandler._url_policy)
metrics_total.metrics_start("started {}".format(str_metrics))
@@ -284,15 +285,23 @@ class DeployHandler(object):
if not message:
return
- metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
- targetServiceName=DeployHandler._url_policy)
+ with DeployHandler._lock:
+ session = DeployHandler._requests_session
+ target_entity = DeployHandler._target_entity
+ url = DeployHandler._url_policy
+ params = deepcopy(DeployHandler._query)
+ custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
+
+ metrics = Metrics(aud_parent=audit, targetEntity="{} policy_update".format(target_entity),
+ targetServiceName=url)
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
- log_action = "put to {0} at {1}".format(
- DeployHandler._target_entity, DeployHandler._url_policy)
- log_data = " msg={} headers={}, params={}".format(json.dumps(message), json.dumps(headers),
- json.dumps(DeployHandler._query))
- log_line = log_action + log_data
+ log_action = "put to {} at {}".format(target_entity, url)
+ log_data = "msg={} headers={}, params={} custom_kwargs({})".format(
+ json.dumps(message), json.dumps(headers),
+ json.dumps(params), json.dumps(custom_kwargs))
+ log_line = log_action + " " + log_data
+
DeployHandler._logger.info(log_line)
metrics.metrics_start(log_line)
@@ -306,20 +315,13 @@ class DeployHandler(object):
res = None
try:
- with DeployHandler._lock:
- session = DeployHandler._requests_session
- url = DeployHandler._url_policy
- params = deepcopy(DeployHandler._query)
- custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
-
- res = session.put(url, json=message,
- headers=headers, params=params, **custom_kwargs)
+ res = session.put(url, json=message, headers=headers, params=params, **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- error_msg = ("failed to {0} {1}: {2}{3}"
- .format(log_action, type(ex).__name__, str(ex), log_data))
+ error_msg = "failed to {} {}: {} {}".format(
+ log_action, type(ex).__name__, str(ex), log_data)
DeployHandler._logger.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
@@ -329,8 +331,8 @@ class DeployHandler(object):
metrics.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
- log_line = "response {0} from {1}: text={2}{3}".format(res.status_code, log_action,
- res.text, log_data)
+ log_line = "response {} from {}: text={} {}".format(
+ res.status_code, log_action, res.text, log_data)
metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
@@ -349,19 +351,29 @@ class DeployHandler(object):
that were deployed by deployment-handler
"""
DeployHandler._lazy_init(audit)
- metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
- targetServiceName=DeployHandler._url_policy)
+
+ with DeployHandler._lock:
+ session = DeployHandler._requests_session
+ target_entity = DeployHandler._target_entity
+ url = DeployHandler._url_policy
+ params = deepcopy(DeployHandler._query)
+ custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
+
+ metrics = Metrics(aud_parent=audit,
+ targetEntity="{} get_deployed_policies".format(target_entity),
+ targetServiceName=url)
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
- log_action = "get {0}: {1}".format(DeployHandler._target_entity, DeployHandler._url_policy)
- log_data = " headers={}, params={}".format(json.dumps(headers),
- json.dumps(DeployHandler._query))
- log_line = log_action + log_data
+ log_action = "get from {} at {}".format(target_entity, url)
+ log_data = "headers={}, params={} custom_kwargs({})".format(
+ json.dumps(headers), json.dumps(params), json.dumps(custom_kwargs))
+ log_line = log_action + " " + log_data
+
DeployHandler._logger.info(log_line)
metrics.metrics_start(log_line)
if not DeployHandler._url:
- error_msg = "no url found to {0}".format(log_line)
+ error_msg = "no url found to {}".format(log_line)
DeployHandler._logger.error(error_msg)
metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
@@ -370,19 +382,13 @@ class DeployHandler(object):
res = None
try:
- with DeployHandler._lock:
- session = DeployHandler._requests_session
- url = DeployHandler._url_policy
- params = deepcopy(DeployHandler._query)
- custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
-
res = session.get(url, headers=headers, params=params, **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
- error_msg = ("failed to {0} {1}: {2}{3}"
- .format(log_action, type(ex).__name__, str(ex), log_data))
+ error_msg = "failed to {} {}: {} {}".format(
+ log_action, type(ex).__name__, str(ex), log_data)
DeployHandler._logger.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
@@ -392,8 +398,8 @@ class DeployHandler(object):
metrics.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
- log_line = ("response {0} from {1}: text={2}{3}"
- .format(res.status_code, log_action, res.text, log_data))
+ log_line = "response {} from {}: text={} {}".format(
+ res.status_code, log_action, res.text, log_data)
metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 1edb24d..96afd59 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -25,8 +25,11 @@ on receiving the policy-notifications, the policy-receiver
passes the notifications to policy-updater
"""
+import copy
import json
import logging
+import os
+import ssl
import time
from threading import Lock, Thread
@@ -35,6 +38,7 @@ import websocket
from .config import Config, Settings
from .policy_consts import MATCHING_CONDITIONS, POLICY_NAME, POLICY_VERSION
from .policy_updater import PolicyUpdater
+from .policy_utils import Utils
LOADED_POLICIES = 'loadedPolicies'
REMOVED_POLICIES = 'removedPolicies'
@@ -54,6 +58,8 @@ class _PolicyReceiver(Thread):
self._settings = Settings(Config.FIELD_POLICY_ENGINE)
self._web_socket_url = None
+ self._web_socket_sslopt = None
+ self._tls_wss_ca_mode = None
self._web_socket = None
self.reconfigure()
@@ -71,21 +77,39 @@ class _PolicyReceiver(Thread):
return False
prev_web_socket_url = self._web_socket_url
- resturl = config.get("url", "") + config.get("path_pdp", "")
+ prev_web_socket_sslopt = self._web_socket_sslopt
+ self._web_socket_sslopt = None
+
+ resturl = (config.get("url", "").lower()
+ + config.get("path_notifications", "/pdp/notifications"))
+
+ self._tls_wss_ca_mode = config.get(Config.TLS_WSS_CA_MODE)
if resturl.startswith("https:"):
- self._web_socket_url = resturl.replace("https:", "wss:") + "notifications"
+ self._web_socket_url = resturl.replace("https:", "wss:")
+
+ verify = Config.get_tls_verify(self._tls_wss_ca_mode)
+ if verify is False:
+ self._web_socket_sslopt = {'cert_reqs': ssl.CERT_NONE}
+ elif verify is True:
+ pass
+ else:
+ self._web_socket_sslopt = {'ca_certs': verify}
+
else:
- self._web_socket_url = resturl.replace("http:", "ws:") + "notifications"
+ self._web_socket_url = resturl.replace("http:", "ws:")
- if self._web_socket_url == prev_web_socket_url:
- _PolicyReceiver._logger.info("not changed web_socket_url(%s): %s",
- self._web_socket_url, self._settings)
+ if (self._web_socket_url == prev_web_socket_url
+ and Utils.are_the_same(prev_web_socket_sslopt, self._web_socket_sslopt)):
+ _PolicyReceiver._logger.info(
+ "not changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s",
+ self._web_socket_url, self._tls_wss_ca_mode, self._settings)
self._settings.commit_change()
return False
- _PolicyReceiver._logger.info("changed web_socket_url(%s): %s",
- self._web_socket_url, self._settings)
+ _PolicyReceiver._logger.info("changed web_socket_url(%s) or tls_wss_ca_mode(%s): %s",
+ self._web_socket_url, self._tls_wss_ca_mode,
+ self._settings)
self._settings.commit_change()
self._stop_notifications()
@@ -103,18 +127,27 @@ class _PolicyReceiver(Thread):
if restarting:
time.sleep(5)
+ if not self._get_keep_running():
+ break
+
+ with self._lock:
+ web_socket_url = self._web_socket_url
+ sslopt = copy.deepcopy(self._web_socket_sslopt)
+ tls_wss_ca_mode = self._tls_wss_ca_mode
_PolicyReceiver._logger.info(
- "connecting to policy-notifications at: %s", self._web_socket_url)
+ "connecting to policy-notifications at %s with sslopt(%s) tls_wss_ca_mode(%s)",
+ web_socket_url, json.dumps(sslopt), tls_wss_ca_mode)
+
self._web_socket = websocket.WebSocketApp(
- self._web_socket_url,
+ web_socket_url,
on_message=self._on_pdp_message,
on_close=self._on_ws_close,
on_error=self._on_ws_error
)
_PolicyReceiver._logger.info("waiting for policy-notifications...")
- self._web_socket.run_forever()
+ self._web_socket.run_forever(sslopt=sslopt)
restarting = True
_PolicyReceiver._logger.info("exit policy-receiver")
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index d10f4bf..0713b38 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -61,6 +61,7 @@ class PolicyRest(object):
_url_get_config = None
_headers = None
_target_entity = None
+ _custom_kwargs = {}
_thread_pool_size = 4
_policy_retry_count = 1
_policy_retry_sleep = 0
@@ -68,6 +69,8 @@ class PolicyRest(object):
@staticmethod
def _init():
"""init static config"""
+ PolicyRest._custom_kwargs = {}
+
_, config = PolicyRest._settings.get_by_key(Config.FIELD_POLICY_ENGINE)
if not PolicyRest._requests_session:
@@ -82,9 +85,9 @@ class PolicyRest(object):
'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
pool_maxsize=pool_size))
- PolicyRest._url_get_config = (config["url"] + config["path_api"]
+ PolicyRest._url_get_config = (config.get("url", "") + config.get("path_api", "")
+ PolicyRest.POLICY_GET_CONFIG)
- PolicyRest._headers = config["headers"]
+ PolicyRest._headers = config.get("headers", {})
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
_, PolicyRest._thread_pool_size = PolicyRest._settings.get_by_key(
Config.THREAD_POOL_SIZE, 4)
@@ -96,9 +99,13 @@ class PolicyRest(object):
_, PolicyRest._policy_retry_sleep = PolicyRest._settings.get_by_key(
Config.POLICY_RETRY_SLEEP, 0)
- PolicyRest._logger.info("PDP url(%s) headers(%s): %s",
- PolicyRest._url_get_config,
+ tls_ca_mode = config.get(Config.TLS_CA_MODE)
+ PolicyRest._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode)
+
+ PolicyRest._logger.info("PDP(%s) url(%s) headers(%s) tls_ca_mode(%s) custom_kwargs(%s): %s",
+ PolicyRest._target_entity, PolicyRest._url_get_config,
Metrics.json_dumps(PolicyRest._headers),
+ tls_ca_mode, json.dumps(PolicyRest._custom_kwargs),
PolicyRest._settings)
PolicyRest._settings.commit_change()
@@ -133,25 +140,27 @@ class PolicyRest(object):
@staticmethod
def _pdp_get_config(audit, json_body):
"""Communication with the policy-engine"""
- metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity,
- targetServiceName=PolicyRest._url_get_config)
-
with PolicyRest._lock:
session = PolicyRest._requests_session
+ target_entity = PolicyRest._target_entity
url = PolicyRest._url_get_config
headers = copy.deepcopy(PolicyRest._headers)
+ custom_kwargs = copy.deepcopy(PolicyRest._custom_kwargs)
+
+ metrics = Metrics(aud_parent=audit, targetEntity=target_entity, targetServiceName=url)
headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
- headers_str = Metrics.json_dumps(headers)
- msg = json.dumps(json_body)
- log_line = "post to PDP {} msg={} headers={}".format(url, msg, headers_str)
+ log_action = "post to {} at {}".format(target_entity, url)
+ log_data = "msg={} headers={}, custom_kwargs({})".format(
+ json.dumps(json_body), Metrics.json_dumps(headers), json.dumps(custom_kwargs))
+ log_line = log_action + " " + log_data
PolicyRest._logger.info(metrics.metrics_start(log_line))
res = None
try:
- res = session.post(url, json=json_body, headers=headers)
+ res = session.post(url, json=json_body, headers=headers, **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
@@ -164,8 +173,8 @@ class PolicyRest(object):
metrics.metrics(error_msg)
return (error_code, None)
- log_line = "response {} from post to PDP {}: msg={} text={} headers={}".format(
- res.status_code, url, msg, res.text,
+ log_line = "response {} from {}: text={} headers={}".format(
+ res.status_code, log_line, res.text,
Metrics.json_dumps(dict(res.request.headers.items())))
status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res)
@@ -280,13 +289,13 @@ class PolicyRest(object):
break
if retry == PolicyRest._policy_retry_count:
- audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
+ audit.warn("gave up retrying {} from PDP after #{} for policy_id={}"
.format(PolicyRest._url_get_config, retry, policy_id),
error_code=AuditResponseCode.DATA_ERROR)
break
audit.warn(
- "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
+ "retry #{} {} from PDP in {} secs for policy_id={}".format(
retry, PolicyRest._url_get_config,
PolicyRest._policy_retry_sleep, policy_id),
error_code=AuditResponseCode.DATA_ERROR)
@@ -301,7 +310,7 @@ class PolicyRest(object):
if not PolicyRest._validate_policy(latest_policy):
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.error(
- "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
+ "received invalid policy from PDP: {}".format(json.dumps(latest_policy)),
error_code=AuditResponseCode.DATA_ERROR)
return latest_policy
@@ -322,7 +331,7 @@ class PolicyRest(object):
)
if not latest_policy and not expect_policy_removed:
- audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
+ audit.error("received unexpected policy data from PDP for policy_id={}: {}"
.format(policy_id, json.dumps(policy_bodies or [])),
error_code=AuditResponseCode.DATA_ERROR)
@@ -436,7 +445,7 @@ class PolicyRest(object):
if errored_policies:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.error(
- "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
+ "errored_policies in PDP: {}".format(json.dumps(errored_policies)),
error_code=AuditResponseCode.DATA_ERROR)
return updated_policies, removed_policies
@@ -460,7 +469,7 @@ class PolicyRest(object):
if not latest_policies:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.warn(
- "received no policies from PDP for policy_filter {0}: {1}"
+ "received no policies from PDP for policy_filter {}: {}"
.format(str_policy_filter, json.dumps(policy_bodies or [])),
error_code=AuditResponseCode.DATA_ERROR)
return None, latest_policies
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 8909cc7..235e2b6 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -420,7 +420,7 @@ class PolicyUpdater(Thread):
PolicyUpdater._logger.warning(result)
elif not updated_policies and not removed_policies:
result = "- not sending empty policy-updates to deployment-handler"
- PolicyUpdater._logger.warning(result)
+ PolicyUpdater._logger.info(result)
else:
message = PolicyUpdateMessage(updated_policies, removed_policies,
policy_filter_matches, False)
diff --git a/pom.xml b/pom.xml
index 54b234c..8b6ca2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
<groupId>org.onap.dcaegen2.platform</groupId>
<artifactId>policy-handler</artifactId>
<name>dcaegen2-platform-policy-handler</name>
- <version>4.2.0-SNAPSHOT</version>
+ <version>4.3.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/run_policy.sh b/run_policy.sh
index 1d411b2..3b804ec 100644
--- a/run_policy.sh
+++ b/run_policy.sh
@@ -26,7 +26,7 @@ STARTED=$(date +%Y-%m-%d_%T.%N)
echo "${STARTED}: running ${BASH_SOURCE[0]}"
echo "APP_VER =" $(python setup.py --version)
echo "HOSTNAME =" ${HOSTNAME}
-(uname -a; echo "/etc/hosts"; cat /etc/hosts; pwd)
+(uname -a; echo "/etc/hosts"; cat /etc/hosts; pwd; openssl version -a)
python -m policyhandler &
PID=$!
diff --git a/setup.py b/setup.py
index c620e2b..2168cd6 100644
--- a/setup.py
+++ b/setup.py
@@ -23,7 +23,7 @@ from setuptools import setup
setup(
name='policyhandler',
description='DCAE-Controller policy-handler to communicate with policy-engine',
- version="4.2.0",
+ version="4.3.0",
author='Alex Shatov',
packages=['policyhandler'],
zip_safe=False,
diff --git a/tests/mock_config.json b/tests/mock_config.json
index 98b0d19..7ec0ac6 100644
--- a/tests/mock_config.json
+++ b/tests/mock_config.json
@@ -13,7 +13,7 @@
},
"policy_engine" : {
"url" : "https://pdp-server:8081",
- "path_pdp" : "/pdp/",
+ "path_notifications" : "/pdp/notifications",
"path_api" : "/pdp/api/",
"headers" : {
"Accept" : "application/json",
@@ -22,7 +22,9 @@
"Authorization" : "Basic auth",
"Environment" : "TEST"
},
- "target_entity" : "policy_engine"
+ "target_entity" : "policy_engine",
+ "tls_ca_mode" : "cert_directory",
+ "tls_wss_ca_mode" : "cert_directory"
},
"deploy_handler" : {
"target_entity" : "deployment_handler",
@@ -30,7 +32,8 @@
"max_msg_length_mb" : 5,
"query" : {
"cfy_tenant_name" : "default_tenant"
- }
+ },
+ "tls_ca_mode" : "cert_directory"
}
}
}
diff --git a/tests/test_policyhandler.py b/tests/test_policyhandler.py
index d14aeea..c501c12 100644
--- a/tests/test_policyhandler.py
+++ b/tests/test_policyhandler.py
@@ -175,7 +175,7 @@ class MockDeploymentHandler(object):
@pytest.fixture()
def fix_pdp_post(monkeypatch):
"""monkeyed request /getConfig to PDP"""
- def monkeyed_policy_rest_post(full_path, json=None, headers=None):
+ def monkeyed_policy_rest_post(full_path, json=None, headers=None, **custom_kwargs):
"""monkeypatch for the POST to policy-engine"""
res_json = MockPolicyEngine.get_config(json.get(POLICY_NAME))
return MonkeyedResponse(full_path, res_json, json, headers)
@@ -191,7 +191,7 @@ def fix_pdp_post(monkeypatch):
@pytest.fixture()
def fix_pdp_post_big(monkeypatch):
"""monkeyed request /getConfig to PDP"""
- def monkeyed_policy_rest_post(full_path, json=None, headers=None):
+ def monkeyed_policy_rest_post(full_path, json=None, headers=None, **custom_kwargs):
"""monkeypatch for the POST to policy-engine"""
res_json = MockPolicyEngine.get_configs_all()
return MonkeyedResponse(full_path, res_json, json, headers)
@@ -212,7 +212,7 @@ class MockException(Exception):
@pytest.fixture()
def fix_pdp_post_boom(monkeypatch):
"""monkeyed request /getConfig to PDP - exception"""
- def monkeyed_policy_rest_post_boom(full_path, json=None, headers=None):
+ def monkeyed_policy_rest_post_boom(full_path, json=None, headers=None, **custom_kwargs):
"""monkeypatch for the POST to policy-engine"""
raise MockException("fix_pdp_post_boom")
@@ -268,12 +268,13 @@ def fix_discovery(monkeypatch):
@pytest.fixture()
def fix_deploy_handler(monkeypatch):
"""monkeyed requests to deployment-handler"""
- def monkeyed_deploy_handler_put(full_path, json=None, headers=None, params=None):
+ def monkeyed_deploy_handler_put(full_path, json=None, headers=None,
+ params=None, **custom_kwargs):
"""monkeypatch for policy-update request.put to deploy_handler"""
return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(),
json, headers)
- def monkeyed_deploy_handler_get(full_path, headers=None, params=None):
+ def monkeyed_deploy_handler_get(full_path, headers=None, params=None, **custom_kwargs):
"""monkeypatch policy-update request.get to deploy_handler"""
return MonkeyedResponse(full_path, MockDeploymentHandler.get_deployed_policies(),
None, headers)
@@ -295,7 +296,8 @@ def fix_deploy_handler(monkeypatch):
@pytest.fixture()
def fix_deploy_handler_fail(monkeypatch):
"""monkeyed failed discovery request.get"""
- def monkeyed_deploy_handler_put(full_path, json=None, headers=None, params=None):
+ def monkeyed_deploy_handler_put(full_path, json=None, headers=None,
+ params=None, **custom_kwargs):
"""monkeypatch for deploy_handler"""
res = MonkeyedResponse(
full_path,
@@ -305,7 +307,7 @@ def fix_deploy_handler_fail(monkeypatch):
res.status_code = 413
return res
- def monkeyed_deploy_handler_get(full_path, headers=None, params=None):
+ def monkeyed_deploy_handler_get(full_path, headers=None, params=None, **custom_kwargs):
"""monkeypatch policy-update request.get to deploy_handler"""
return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(),
None, headers)
diff --git a/version.properties b/version.properties
index 29d7138..e77ddf3 100644
--- a/version.properties
+++ b/version.properties
@@ -1,5 +1,5 @@
major=4
-minor=2
+minor=3
patch=0
base_version=${major}.${minor}.${patch}
release_version=${base_version}