aboutsummaryrefslogtreecommitdiffstats
path: root/policyhandler/deploy_handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'policyhandler/deploy_handler.py')
-rw-r--r--policyhandler/deploy_handler.py106
1 files changed, 70 insertions, 36 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 6d258f2..e308c1a 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -21,17 +21,18 @@
import json
import logging
from copy import copy, deepcopy
+from threading import Lock
import requests
-from .config import Config
+from .config import Config, Settings
from .customize import CustomizerUser
from .discovery import DiscoveryClient
from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
AuditResponseCode, Metrics)
from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES,
POLICY_FILTER_MATCHES, POLICY_FILTERS,
- REMOVED_POLICIES)
+ REMOVED_POLICIES, TARGET_ENTITY)
class PolicyUpdateMessage(object):
@@ -144,7 +145,11 @@ class PolicyUpdateMessage(object):
class DeployHandler(object):
"""calling the deployment-handler web apis"""
_logger = logging.getLogger("policy_handler.deploy_handler")
+ DEFAULT_TARGET_ENTITY = "deployment_handler"
+
_lazy_inited = False
+ _lock = Lock()
+ _settings = Settings(Config.POOL_CONNECTIONS, Config.DEPLOY_HANDLER)
_requests_session = None
_url = None
@@ -157,11 +162,8 @@ class DeployHandler(object):
server_instance_changed = False
@staticmethod
- def _lazy_init(audit, rediscover=False):
- """ set static properties """
- if DeployHandler._lazy_inited and not rediscover:
- return
-
+ def _init(audit):
+ """set config"""
DeployHandler._custom_kwargs = (CustomizerUser.get_customizer()
.get_deploy_handler_kwargs(audit))
if (not DeployHandler._custom_kwargs
@@ -169,18 +171,18 @@ class DeployHandler(object):
DeployHandler._custom_kwargs = {}
if not DeployHandler._requests_session:
- pool_size = Config.settings.get("pool_connections", 20)
DeployHandler._requests_session = requests.Session()
+
+ changed, pool_size = DeployHandler._settings.get_by_key(Config.POOL_CONNECTIONS, 10)
+ if changed:
DeployHandler._requests_session.mount(
- 'https://',
- requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
- )
+ 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
DeployHandler._requests_session.mount(
- 'http://',
- requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
- )
+ 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size,
+ pool_maxsize=pool_size))
- config_dh = Config.settings.get("deploy_handler")
+ _, config_dh = DeployHandler._settings.get_by_key(Config.DEPLOY_HANDLER)
if config_dh and isinstance(config_dh, dict):
# dns based routing to deployment-handler
# config for policy-handler >= 2.4.0
@@ -192,7 +194,8 @@ class DeployHandler(object):
# "cfy_tenant_name" : "default_tenant"
# }
# }
- DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler")
+ DeployHandler._target_entity = config_dh.get(TARGET_ENTITY,
+ DeployHandler.DEFAULT_TARGET_ENTITY)
DeployHandler._url = config_dh.get("url")
DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb",
DeployHandler._max_msg_length_mb)
@@ -205,19 +208,45 @@ class DeployHandler(object):
if not isinstance(config_dh, dict):
# config for policy-handler <= 2.3.1
# "deploy_handler" : "deployment_handler"
- DeployHandler._target_entity = str(config_dh or "deployment_handler")
+ DeployHandler._target_entity = str(config_dh or DeployHandler.DEFAULT_TARGET_ENTITY)
DeployHandler._url = DiscoveryClient.get_service_url(audit,
DeployHandler._target_entity)
DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy'
- DeployHandler._logger.info(
- "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy)
+ DeployHandler._logger.info("got %s policy url(%s): %s", DeployHandler._target_entity,
+ DeployHandler._url_policy, DeployHandler._settings)
+ DeployHandler._settings.commit_change()
DeployHandler._lazy_inited = bool(DeployHandler._url)
+ @staticmethod
+ def reconfigure(audit):
+ """reconfigure"""
+ with DeployHandler._lock:
+ DeployHandler._settings.set_config(Config.discovered_config)
+ if not DeployHandler._settings.is_changed():
+ DeployHandler._settings.commit_change()
+ return False
+
+ DeployHandler._lazy_inited = False
+ DeployHandler._init(audit)
+ return True
@staticmethod
- def policy_update(audit, policy_update_message, rediscover=False):
+ def _lazy_init(audit):
+ """set config"""
+ if DeployHandler._lazy_inited:
+ return
+
+ with DeployHandler._lock:
+ if DeployHandler._lazy_inited:
+ return
+
+ DeployHandler._settings.set_config(Config.discovered_config)
+ DeployHandler._init(audit)
+
+ @staticmethod
+ def policy_update(audit, policy_update_message):
"""
segments the big policy_update_message limited by size
and sequatially sends each segment as put to deployment-handler at /policy.
@@ -227,7 +256,7 @@ class DeployHandler(object):
if not policy_update_message or policy_update_message.empty():
return
- DeployHandler._lazy_init(audit, rediscover)
+ DeployHandler._lazy_init(audit)
str_metrics = "policy_update {0}".format(str(policy_update_message))
@@ -277,11 +306,14 @@ class DeployHandler(object):
res = None
try:
- res = DeployHandler._requests_session.put(
- DeployHandler._url_policy, json=message,
- headers=headers, params=DeployHandler._query,
- **DeployHandler._custom_kwargs
- )
+ with DeployHandler._lock:
+ session = DeployHandler._requests_session
+ url = DeployHandler._url_policy
+ params = deepcopy(DeployHandler._query)
+ custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
+
+ res = session.put(url, json=message,
+ headers=headers, params=params, **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
@@ -311,12 +343,12 @@ class DeployHandler(object):
@staticmethod
- def get_deployed_policies(audit, rediscover=False):
+ def get_deployed_policies(audit):
"""
Retrieves policies and policy-filters from components
that were deployed by deployment-handler
"""
- DeployHandler._lazy_init(audit, rediscover)
+ DeployHandler._lazy_init(audit)
metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
targetServiceName=DeployHandler._url_policy)
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
@@ -338,10 +370,13 @@ class DeployHandler(object):
res = None
try:
- res = DeployHandler._requests_session.get(
- DeployHandler._url_policy, headers=headers, params=DeployHandler._query,
- **DeployHandler._custom_kwargs
- )
+ with DeployHandler._lock:
+ session = DeployHandler._requests_session
+ url = DeployHandler._url_policy
+ params = deepcopy(DeployHandler._query)
+ custom_kwargs = deepcopy(DeployHandler._custom_kwargs)
+
+ res = session.get(url, headers=headers, params=params, **custom_kwargs)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
@@ -390,7 +425,6 @@ class DeployHandler(object):
and prev_server_instance_uuid != DeployHandler._server_instance_uuid):
DeployHandler.server_instance_changed = True
- log_line = ("deployment_handler_changed: {1} != {0}"
- .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))
- metrics.info(log_line)
- DeployHandler._logger.info(log_line)
+ DeployHandler._logger.info(metrics.info(
+ "deployment_handler_changed: {1} != {0}"
+ .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)))