diff options
author | Alex Shatov <alexs@att.com> | 2018-08-07 12:11:35 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-08-07 12:11:35 -0400 |
commit | d7f34d4b71ec4d86547628cda351d20bff4d017f (patch) | |
tree | 101c7669fb5508a103894e262964da0d0c8319bc /policyhandler/deploy_handler.py | |
parent | a29f70823b18f492417629f56c86f61f94b96af8 (diff) |
4.0.0 new dataflow on policy-update and catchup
- changed API and functionality - new dataflow
- new dataflow between policy-handler and deployment-handler
on policy-update and catchup
= GETting policy_ids+versions and policy-filters from
deployment-handler
= PUTting policy-update and catchup in the new message format
= data segmenting the policy-update/catchup messages to
deployment-handler to avoid 413 on deployment-handler side
= matching policies from policy-engine to policies
and policy-filters from deployment-handler
= coarsening the policyName filter received from deployment-handler
to reduce the number messages passed to policy-engine on catchup
= consolidating sequential policy-updates into a single request
when the policy-update is busy
- removed policy scope-prefixes from config and logic -
it is not needed anymore because
= the policy matching happens directly to policies and
policy-filters received from deployment-handler
= on catchup - the policy scope-prefix equivalents are calculated
based on the data received from deployment-handler
- API - GET /policies_latest now returns the info on deployed
policy_ids+versions and policy-filters, rather than policies
of the scope-prefixes previously found in config (obsolete)
- not sending an empty catch_up message to deployment-handler
when nothing changed
- send policy-removed to deployment-handler when getting
404-not found from PDP on removal of policy
- config change: removed catch_up.max_skips - obsolete
- brought the latest CommonLogger.py
- minor refactoring - improved naming of variables
Change-Id: I36b3412eefd439088cb693703a6e5f18f4238b00
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-492
Diffstat (limited to 'policyhandler/deploy_handler.py')
-rw-r--r-- | policyhandler/deploy_handler.py | 266 |
1 files changed, 241 insertions, 25 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py index ea703f4..6b7788c 100644 --- a/policyhandler/deploy_handler.py +++ b/policyhandler/deploy_handler.py @@ -16,32 +16,144 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -""" send notification to deploy-handler""" +"""send policy-update notification to deployment-handler""" import json import logging +from copy import copy, deepcopy import requests from .config import Config from .customize import CustomizerUser from .discovery import DiscoveryClient -from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics +from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, + AuditResponseCode, Metrics) +from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES, + POLICY_FILTER_MATCHES, POLICY_FILTERS, + REMOVED_POLICIES) + + +class PolicyUpdateMessage(object): + """class for messages to deployment-handler on policy-update""" + BYTES_IN_MB = 1 << 2 * 10 + + def __init__(self, latest_policies=None, + removed_policies=None, policy_filter_matches=None, catch_up=True): + """init""" + self._catch_up = catch_up + self._latest_policies = deepcopy(latest_policies or {}) + self._removed_policies = copy(removed_policies or {}) + self._policy_filter_matches = deepcopy(policy_filter_matches or {}) + + self._message = { + CATCH_UP: self._catch_up, + LATEST_POLICIES: self._latest_policies, + REMOVED_POLICIES: self._removed_policies, + POLICY_FILTER_MATCHES: self._policy_filter_matches + } + self.msg_length = 0 + self._calc_stats() + + def _calc_stats(self): + """generate the message and calc stats""" + self.msg_length = len(json.dumps(self._message)) + + def empty(self): + """checks whether have any data""" + return (not self._latest_policies + and not self._removed_policies + and not self._policy_filter_matches) + + def add(self, policy_id, latest_policy=None, policy_filter_ids=None, removed_policy=None): + """add the parts from the other message to the current message""" + if not policy_id or not (latest_policy or policy_filter_ids or removed_policy): + return + + if latest_policy: + self._latest_policies[policy_id] = deepcopy(latest_policy) + + if policy_filter_ids: + if policy_id not in self._policy_filter_matches: + self._policy_filter_matches[policy_id] = {} + self._policy_filter_matches[policy_id].update(policy_filter_ids) + + if removed_policy is not None: + self._removed_policies[policy_id] = removed_policy + + self._calc_stats() + + def get_message(self): + """expose the copy of the message""" + return deepcopy(self._message) + + def __str__(self): + """to string""" + return json.dumps(self._message) + + def _iter_over_removed_policies(self): + """generator of iterator over removed_policies""" + for (policy_id, value) in self._removed_policies.items(): + yield (policy_id, value) + + def _iter_over_latest_policies(self): + """generator of iterator over latest_policies and policy_filter_matches""" + for (policy_id, policy) in self._latest_policies.items(): + yield (policy_id, policy, self._policy_filter_matches.get(policy_id)) + + def gen_segmented_messages(self, max_msg_length_mb): + """ + Break the policy-update message into a list of segmented messages. + + Each segmented message should not exceed the max_msg_length_mb from config. + """ + max_msg_length_mb = (max_msg_length_mb or 10) * PolicyUpdateMessage.BYTES_IN_MB + + messages = [] + curr_message = PolicyUpdateMessage(catch_up=self._catch_up) + + for (policy_id, value) in self._iter_over_removed_policies(): + if (not curr_message.empty() + and (len(policy_id) + len(str(value)) + curr_message.msg_length + > max_msg_length_mb)): + messages.append(curr_message.get_message()) + curr_message = PolicyUpdateMessage(catch_up=self._catch_up) + curr_message.add(policy_id, removed_policy=value) + + for (policy_id, policy, policy_filter_ids) in self._iter_over_latest_policies(): + if (not curr_message.empty() + and (2 * len(policy_id) + len(json.dumps(policy)) + + len(json.dumps(policy_filter_ids)) + + curr_message.msg_length > max_msg_length_mb)): + messages.append(curr_message.get_message()) + curr_message = PolicyUpdateMessage(catch_up=self._catch_up) + curr_message.add(policy_id, latest_policy=policy, policy_filter_ids=policy_filter_ids) + + if not curr_message.empty(): + messages.append(curr_message.get_message()) + + msg_count = len(messages) + if msg_count > 1: + msg_count = "/" + str(msg_count) + for idx, msg in enumerate(messages): + msg["data_segment"] = str((idx+1)) + msg_count + + return messages -POOL_SIZE = 1 class DeployHandler(object): - """ deploy-handler """ + """calling the deployment-handler web apis""" _logger = logging.getLogger("policy_handler.deploy_handler") _lazy_inited = False _requests_session = None - _config = None _url = None _url_policy = None + _max_msg_length_mb = 10 _target_entity = None _custom_kwargs = None _server_instance_uuid = None + server_instance_changed = False @staticmethod def _lazy_init(audit, rediscover=False): @@ -56,14 +168,15 @@ class DeployHandler(object): DeployHandler._custom_kwargs = {} if not DeployHandler._requests_session: + pool_size = Config.settings.get("pool_connections", 20) DeployHandler._requests_session = requests.Session() DeployHandler._requests_session.mount( 'https://', - requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) + requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) DeployHandler._requests_session.mount( 'http://', - requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE) + requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) ) config_dh = Config.settings.get("deploy_handler") @@ -72,10 +185,13 @@ class DeployHandler(object): # config for policy-handler >= 2.4.0 # "deploy_handler" : { # "target_entity" : "deployment_handler", - # "url" : "http://deployment_handler:8188" + # "url" : "http://deployment_handler:8188", + # "max_msg_length_mb" : 100 # } DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler") DeployHandler._url = config_dh.get("url") + DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb", + DeployHandler._max_msg_length_mb) DeployHandler._logger.info("dns based routing to %s: url(%s)", DeployHandler._target_entity, DeployHandler._url) @@ -96,26 +212,52 @@ class DeployHandler(object): @staticmethod - def policy_update(audit, message, rediscover=False): + def policy_update(audit, policy_update_message, rediscover=False): """ - post policy_updated message to deploy-handler + segments the big policy_update_message limited by size + and sequatially sends each segment as put to deployment-handler at /policy. - returns condition whether it needs to catch_up + param policy_update_message is of PolicyUpdateMessage type """ - if not message: + if not policy_update_message or policy_update_message.empty(): return DeployHandler._lazy_init(audit, rediscover) + + str_metrics = "policy_update {0}".format(str(policy_update_message)) + + metrics_total = Metrics( + aud_parent=audit, + targetEntity="{0} total policy_update".format(DeployHandler._target_entity), + targetServiceName=DeployHandler._url_policy) + + metrics_total.metrics_start("started {}".format(str_metrics)) + messages = policy_update_message.gen_segmented_messages(DeployHandler._max_msg_length_mb) + for message in messages: + DeployHandler._policy_update(audit, message) + if not audit.is_success(): + break + metrics_total.metrics("done {}".format(str_metrics)) + + @staticmethod + def _policy_update(audit, message): + """ + sends the put message to deployment-handler at /policy + + detects whether server_instance_changed condition on deployment-handler + that is the cause to catch_up + """ + if not message: + return + metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, targetServiceName=DeployHandler._url_policy) headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} - msg_str = json.dumps(message) - headers_str = json.dumps(headers) - - log_action = "post to {0} at {1}".format( + log_action = "put to {0} at {1}".format( DeployHandler._target_entity, DeployHandler._url_policy) - log_data = " msg={0} headers={1}".format(msg_str, headers_str) + log_data = " msg={0} headers={1}".format(json.dumps(message), + json.dumps(headers)) log_line = log_action + log_data DeployHandler._logger.info(log_line) metrics.metrics_start(log_line) @@ -130,7 +272,7 @@ class DeployHandler(object): res = None try: - res = DeployHandler._requests_session.post( + res = DeployHandler._requests_session.put( DeployHandler._url_policy, json=message, headers=headers, **DeployHandler._custom_kwargs ) @@ -149,8 +291,8 @@ class DeployHandler(object): metrics.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) - log_line = "response {0} from {1}: text={2}{3}" \ - .format(res.status_code, log_action, res.text, log_data) + log_line = "response {0} from {1}: text={2}{3}".format(res.status_code, log_action, + res.text, log_data) metrics.metrics(log_line) if res.status_code != requests.codes.ok: @@ -159,15 +301,89 @@ class DeployHandler(object): DeployHandler._logger.info(log_line) result = res.json() or {} + DeployHandler._server_instance_changed(result, metrics) + + + @staticmethod + def get_deployed_policies(audit, rediscover=False): + """ + Retrieves policies and policy-filters from components + that were deployed by deployment-handler + """ + DeployHandler._lazy_init(audit, rediscover) + metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity, + targetServiceName=DeployHandler._url_policy) + headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id} + + log_action = "get {0}: {1}".format(DeployHandler._target_entity, DeployHandler._url_policy) + log_data = " headers={}".format(json.dumps(headers)) + log_line = log_action + log_data + DeployHandler._logger.info(log_line) + metrics.metrics_start(log_line) + + if not DeployHandler._url: + error_msg = "no url found to {0}".format(log_line) + DeployHandler._logger.error(error_msg) + metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) + metrics.metrics(error_msg) + return None, None + + res = None + try: + res = DeployHandler._requests_session.get( + DeployHandler._url_policy, headers=headers, + **DeployHandler._custom_kwargs + ) + except Exception as ex: + error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value + if isinstance(ex, requests.exceptions.RequestException) + else AuditHttpCode.SERVER_INTERNAL_ERROR.value) + error_msg = ("failed to {0} {1}: {2}{3}" + .format(log_action, type(ex).__name__, str(ex), log_data)) + DeployHandler._logger.exception(error_msg) + metrics.set_http_status_code(error_code) + audit.set_http_status_code(error_code) + metrics.metrics(error_msg) + return None, None + + metrics.set_http_status_code(res.status_code) + audit.set_http_status_code(res.status_code) + + log_line = ("response {0} from {1}: text={2}{3}" + .format(res.status_code, log_action, res.text, log_data)) + metrics.metrics(log_line) + + if res.status_code != requests.codes.ok: + DeployHandler._logger.error(log_line) + return None, None + + result = res.json() or {} + DeployHandler._server_instance_changed(result, metrics) + + policies = result.get(POLICIES, {}) + policy_filters = result.get(POLICY_FILTERS, {}) + if not policies and not policy_filters: + audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value) + DeployHandler._logger.warning(audit.warn( + "found no deployed policies or policy-filters: {}".format(log_line), + error_code=AuditResponseCode.DATA_ERROR)) + return policies, policy_filters + + DeployHandler._logger.info(log_line) + return policies, policy_filters + + @staticmethod + def _server_instance_changed(result, metrics): + """Checks whether the deployment-handler instance changed since last call.""" prev_server_instance_uuid = DeployHandler._server_instance_uuid DeployHandler._server_instance_uuid = result.get("server_instance_uuid") - deployment_handler_changed = (prev_server_instance_uuid - and prev_server_instance_uuid != DeployHandler._server_instance_uuid) - if deployment_handler_changed: + if (prev_server_instance_uuid + and prev_server_instance_uuid != DeployHandler._server_instance_uuid): + DeployHandler.server_instance_changed = True + log_line = ("deployment_handler_changed: {1} != {0}" .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)) metrics.info(log_line) DeployHandler._logger.info(log_line) - - return deployment_handler_changed |