diff options
Diffstat (limited to 'policyhandler/deploy_handler.py')
-rw-r--r-- | policyhandler/deploy_handler.py | 106 |
1 files changed, 70 insertions, 36 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index 6d258f2..e308c1a 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -21,17 +21,18 @@ import json import logging from copy import copy, deepcopy +from threading import Lock import requests -from .config import Config +from .config import Config, Settings from .customize import CustomizerUser from .discovery import DiscoveryClient from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, AuditResponseCode, Metrics) from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES, POLICY_FILTER_MATCHES, POLICY_FILTERS, - REMOVED_POLICIES) + REMOVED_POLICIES, TARGET_ENTITY) class PolicyUpdateMessage(object): @@ -144,7 +145,11 @@ class PolicyUpdateMessage(object): class DeployHandler(object): """calling the deployment-handler web apis""" _logger = logging.getLogger("policy_handler.deploy_handler") + DEFAULT_TARGET_ENTITY = "deployment_handler" + _lazy_inited = False + _lock = Lock() + _settings = Settings(Config.POOL_CONNECTIONS, Config.DEPLOY_HANDLER) _requests_session = None _url = None @@ -157,11 +162,8 @@ class DeployHandler(object): server_instance_changed = False @staticmethod - def _lazy_init(audit, rediscover=False): - """ set static properties """ - if DeployHandler._lazy_inited and not rediscover: - return - + def _init(audit): + """set config""" DeployHandler._custom_kwargs = (CustomizerUser.get_customizer() .get_deploy_handler_kwargs(audit)) if (not DeployHandler._custom_kwargs @@ -169,18 +171,18 @@ class DeployHandler(object): DeployHandler._custom_kwargs = {} if not DeployHandler._requests_session: - pool_size = Config.settings.get("pool_connections", 20) DeployHandler._requests_session = requests.Session() + + changed, pool_size = DeployHandler._settings.get_by_key(Config.POOL_CONNECTIONS, 10) + if changed: DeployHandler._requests_session.mount( - 'https://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) + 'https://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) DeployHandler._requests_session.mount( - 'http://', - requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - ) + 'http://', requests.adapters.HTTPAdapter(pool_connections=pool_size, + pool_maxsize=pool_size)) - config_dh = Config.settings.get("deploy_handler") + _, config_dh = DeployHandler._settings.get_by_key(Config.DEPLOY_HANDLER) if config_dh and isinstance(config_dh, dict): # dns based routing to deployment-handler # config for policy-handler >= 2.4.0 @@ -192,7 +194,8 @@ class DeployHandler(object): # "cfy_tenant_name" : "default_tenant" # } # } - DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler") + DeployHandler._target_entity = config_dh.get(TARGET_ENTITY, + DeployHandler.DEFAULT_TARGET_ENTITY) DeployHandler._url = config_dh.get("url") DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb", DeployHandler._max_msg_length_mb) @@ -205,19 +208,45 @@ class DeployHandler(object): if not isinstance(config_dh, dict): # config for policy-handler <= 2.3.1 # "deploy_handler" : "deployment_handler" - DeployHandler._target_entity = str(config_dh or "deployment_handler") + DeployHandler._target_entity = str(config_dh or DeployHandler.DEFAULT_TARGET_ENTITY) DeployHandler._url = DiscoveryClient.get_service_url(audit, DeployHandler._target_entity) DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy' - DeployHandler._logger.info( - "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy) + DeployHandler._logger.info("got %s policy url(%s): %s", DeployHandler._target_entity, + DeployHandler._url_policy, DeployHandler._settings) + DeployHandler._settings.commit_change() DeployHandler._lazy_inited = bool(DeployHandler._url) + @staticmethod + def reconfigure(audit): + """reconfigure""" + with DeployHandler._lock: + DeployHandler._settings.set_config(Config.discovered_config) + if not DeployHandler._settings.is_changed(): + DeployHandler._settings.commit_change() + return False + + DeployHandler._lazy_inited = False + DeployHandler._init(audit) + return True @staticmethod - def policy_update(audit, policy_update_message, rediscover=False): + def _lazy_init(audit): + """set config""" + if DeployHandler._lazy_inited: + return + + with DeployHandler._lock: + if DeployHandler._lazy_inited: + return + + DeployHandler._settings.set_config(Config.discovered_config) + DeployHandler._init(audit) + + @staticmethod + def policy_update(audit, policy_update_message): """ segments the big policy_update_message limited by size and sequatially sends each segment as put to deployment-handler at /policy. @@ -227,7 +256,7 @@ class DeployHandler(object): if not policy_update_message or policy_update_message.empty(): return - DeployHandler._lazy_init(audit, rediscover) + DeployHandler._lazy_init(audit) str_metrics = "policy_update {0}".format(str(policy_update_message)) @@ -277,11 +306,14 @@ class DeployHandler(object): res = None try: - res = DeployHandler._requests_session.put( - DeployHandler._url_policy, json=message, - headers=headers, params=DeployHandler._query, - **DeployHandler._custom_kwargs - ) + with DeployHandler._lock: + session = DeployHandler._requests_session + url = DeployHandler._url_policy + params = deepcopy(DeployHandler._query) + custom_kwargs = deepcopy(DeployHandler._custom_kwargs) + + res = session.put(url, json=message, + headers=headers, params=params, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -311,12 +343,12 @@ class DeployHandler(object): @staticmethod - def get_deployed_policies(audit, rediscover=False): + def get_deployed_policies(audit): """ Retrieves policies and policy-filters from components that were deployed by deployment-handler """ - DeployHandler._lazy_init(audit, rediscover) + DeployHandler._lazy_init(audit) metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, targetServiceName=DeployHandler._url_policy) headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} @@ -338,10 +370,13 @@ class DeployHandler(object): res = None try: - res = DeployHandler._requests_session.get( - DeployHandler._url_policy, headers=headers, params=DeployHandler._query, - **DeployHandler._custom_kwargs - ) + with DeployHandler._lock: + session = DeployHandler._requests_session + url = DeployHandler._url_policy + params = deepcopy(DeployHandler._query) + custom_kwargs = deepcopy(DeployHandler._custom_kwargs) + + res = session.get(url, headers=headers, params=params, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) @@ -390,7 +425,6 @@ class DeployHandler(object): and prev_server_instance_uuid != DeployHandler._server_instance_uuid): DeployHandler.server_instance_changed = True - log_line = ("deployment_handler_changed: {1} != {0}" - .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)) - metrics.info(log_line) - DeployHandler._logger.info(log_line) + DeployHandler._logger.info(metrics.info( + "deployment_handler_changed: {1} != {0}" + .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))) |