# ================================================================================ # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= # """send policy-update notification to deployment-handler""" import json from copy import copy, deepcopy from threading import Lock import requests from .config import Config, Settings from .discovery import DiscoveryClient from .onap.audit import AuditHttpCode, AuditResponseCode, Metrics from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES, POLICY_FILTER_MATCHES, POLICY_FILTERS, REMOVED_POLICIES, TARGET_ENTITY) from .utils import Utils _LOGGER = Utils.get_logger(__file__) class PolicyUpdateMessage(object): """class for messages to deployment-handler on policy-update""" BYTES_IN_MB = 1 << 2 * 10 def __init__(self, latest_policies=None, removed_policies=None, policy_filter_matches=None, catch_up=True): """init""" self._catch_up = catch_up self._latest_policies = deepcopy(latest_policies or {}) self._removed_policies = copy(removed_policies or {}) self._policy_filter_matches = deepcopy(policy_filter_matches or {}) self._message = { CATCH_UP: self._catch_up, LATEST_POLICIES: self._latest_policies, REMOVED_POLICIES: self._removed_policies, POLICY_FILTER_MATCHES: self._policy_filter_matches } self.msg_length = 0 self._calc_stats() def _calc_stats(self): """generate the message and calc stats""" self.msg_length = len(json.dumps(self._message)) def empty(self): """checks whether have any data""" return (not self._latest_policies and not self._removed_policies and not self._policy_filter_matches) def add(self, policy_id, latest_policy=None, policy_filter_ids=None, removed_policy=None): """add the parts from the other message to the current message""" if not policy_id or not (latest_policy or policy_filter_ids or removed_policy): return if latest_policy: self._latest_policies[policy_id] = deepcopy(latest_policy) if policy_filter_ids is not None: if policy_id not in self._policy_filter_matches: self._policy_filter_matches[policy_id] = {} self._policy_filter_matches[policy_id].update(policy_filter_ids) if removed_policy is not None: self._removed_policies[policy_id] = removed_policy self._calc_stats() def get_message(self): """expose the copy of the message""" return deepcopy(self._message) def __str__(self): """to string""" return json.dumps(self._message) def _iter_over_removed_policies(self): """generator of iterator over removed_policies""" for (policy_id, value) in self._removed_policies.items(): yield (policy_id, value) def _iter_over_latest_policies(self): """generator of iterator over latest_policies and policy_filter_matches""" for (policy_id, policy) in self._latest_policies.items(): yield (policy_id, policy, self._policy_filter_matches.get(policy_id)) def gen_segmented_messages(self, max_msg_length_mb): """ Break the policy-update message into a list of segmented messages. Each segmented message should not exceed the max_msg_length_mb from config. """ max_msg_length_mb = (max_msg_length_mb or 10) * PolicyUpdateMessage.BYTES_IN_MB messages = [] curr_message = PolicyUpdateMessage(catch_up=self._catch_up) for (policy_id, value) in self._iter_over_removed_policies(): if (not curr_message.empty() and (len(policy_id) + len(str(value)) + curr_message.msg_length > max_msg_length_mb)): messages.append(curr_message.get_message()) curr_message = PolicyUpdateMessage(catch_up=self._catch_up) curr_message.add(policy_id, removed_policy=value) for (policy_id, policy, policy_filter_ids) in self._iter_over_latest_policies(): if (not curr_message.empty() and (2 * len(policy_id) + len(json.dumps(policy)) + len(json.dumps(policy_filter_ids)) + curr_message.msg_length > max_msg_length_mb)): messages.append(curr_message.get_message()) curr_message = PolicyUpdateMessage(catch_up=self._catch_up) curr_message.add(policy_id, latest_policy=policy, policy_filter_ids=policy_filter_ids) if not curr_message.empty(): messages.append(curr_message.get_message()) msg_count = len(messages) if msg_count > 1: msg_count = "/" + str(msg_count) for idx, msg in enumerate(messages): msg["data_segment"] = str((idx+1)) + msg_count return messages class DeployHandler(object): """calling the deployment-handler web apis""" DEFAULT_TARGET_ENTITY = "deployment_handler" DEFAULT_TIMEOUT_IN_SECS = 60 _lazy_inited = False _lock = Lock() _settings = Settings(Config.POOL_CONNECTIONS, Config.DEPLOY_HANDLER) _requests_session = None _url = None _url_policy = None _max_msg_length_mb = 10 _query = {} _target_entity = None _custom_kwargs = {} _server_instance_uuid = None _timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS server_instance_changed = False @staticmethod def _init(audit): """set config""" DeployHandler._custom_kwargs = {} if not DeployHandler._requests_session: DeployHandler._requests_session = requests.Session() changed, pool_size = DeployHandler._settings.get_by_key(Config.POOL_CONNECTIONS, 10) if changed: DeployHandler._requests_session.mount( 'https://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) DeployHandler._requests_session.mount( 'http://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=pool_size)) _, config_dh = DeployHandler._settings.get_by_key(Config.DEPLOY_HANDLER) if config_dh and isinstance(config_dh, dict): # dns based routing to deployment-handler # config for policy-handler >= 2.4.0 # "deploy_handler" : { # "target_entity" : "deployment_handler", # "url" : "https://deployment_handler:8188", # "max_msg_length_mb" : 10, # "query" : { # "cfy_tenant_name" : "default_tenant" # }, # "tls_ca_mode" : "cert_directory", # "timeout_in_secs": 60 # } DeployHandler._target_entity = config_dh.get(TARGET_ENTITY, DeployHandler.DEFAULT_TARGET_ENTITY) DeployHandler._url = config_dh.get("url") DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb", DeployHandler._max_msg_length_mb) DeployHandler._query = deepcopy(config_dh.get("query", {})) tls_ca_mode = config_dh.get(Config.TLS_CA_MODE) DeployHandler._custom_kwargs = Config.get_requests_kwargs(tls_ca_mode) _LOGGER.info( "dns based routing to %s: url(%s) tls_ca_mode(%s) custom_kwargs(%s)", DeployHandler._target_entity, DeployHandler._url, tls_ca_mode, json.dumps(DeployHandler._custom_kwargs)) DeployHandler._timeout_in_secs = config_dh.get(Config.TIMEOUT_IN_SECS) if not DeployHandler._timeout_in_secs or DeployHandler._timeout_in_secs < 1: DeployHandler._timeout_in_secs = DeployHandler.DEFAULT_TIMEOUT_IN_SECS if not DeployHandler._url: # discover routing to deployment-handler at consul-services if not isinstance(config_dh, dict): # config for policy-handler <= 2.3.1 # "deploy_handler" : "deployment_handler" DeployHandler._target_entity = str(config_dh or DeployHandler.DEFAULT_TARGET_ENTITY) DeployHandler._url = DiscoveryClient.get_service_url(audit, DeployHandler._target_entity) DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy' _LOGGER.info("got %s policy url(%s): %s", DeployHandler._target_entity, DeployHandler._url_policy, DeployHandler._settings) DeployHandler._settings.commit_change() DeployHandler._lazy_inited = bool(DeployHandler._url) @staticmethod def reconfigure(audit): """reconfigure""" with DeployHandler._lock: DeployHandler._settings.set_config(Config.discovered_config) if not DeployHandler._settings.is_changed(): DeployHandler._settings.commit_change() return False DeployHandler._lazy_inited = False DeployHandler._init(audit) return True @staticmethod def _lazy_init(audit): """set config""" if DeployHandler._lazy_inited: return with DeployHandler._lock: if DeployHandler._lazy_inited: return DeployHandler._settings.set_config(Config.discovered_config) DeployHandler._init(audit) @staticmethod def policy_update(audit, policy_update_message): """ segments the big policy_update_message limited by size and sequatially sends each segment as put to deployment-handler at /policy. param policy_update_message is of PolicyUpdateMessage type """ if not policy_update_message or policy_update_message.empty(): return DeployHandler._lazy_init(audit) str_metrics = "policy_update {}".format(str(policy_update_message)) metrics_total = Metrics( aud_parent=audit, targetEntity="{} total policy_update".format(DeployHandler._target_entity), targetServiceName=DeployHandler._url_policy) metrics_total.metrics_start("started {}".format(str_metrics)) messages = policy_update_message.gen_segmented_messages(DeployHandler._max_msg_length_mb) for message in messages: DeployHandler._policy_update(audit, message) if not audit.is_success(): break metrics_total.metrics("done {}".format(str_metrics)) @staticmethod def _policy_update(audit, message): """ sends the put message to deployment-handler at /policy detects whether server_instance_changed condition on deployment-handler that is the cause to catch_up """ if not message: return with DeployHandler._lock: session = DeployHandler._requests_session target_entity = DeployHandler._target_entity url = DeployHandler._url_policy params = deepcopy(DeployHandler._query) timeout_in_secs = DeployHandler._timeout_in_secs custom_kwargs = deepcopy(DeployHandler._custom_kwargs) metrics = Metrics(aud_parent=audit, targetEntity="{} policy_update".format(target_entity), targetServiceName=url) headers = metrics.put_request_id_into_headers() log_action = "put to {} at {}".format(target_entity, url) log_data = "msg={} headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format( json.dumps(message), json.dumps(headers), json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs)) log_line = log_action + " " + log_data _LOGGER.info(log_line) metrics.metrics_start(log_line) if not DeployHandler._url: error_msg = "no url found to {0}".format(log_line) _LOGGER.error(error_msg) metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) metrics.metrics(error_msg) return res = None try: res = session.put(url, json=message, headers=headers, params=params, timeout=timeout_in_secs, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = "failed to {} {}: {} {}".format( log_action, type(ex).__name__, str(ex), log_data) _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) return metrics.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) log_line = "response {} from {}: text={} {}".format( res.status_code, log_action, res.text, log_data) metrics.metrics(log_line) if res.status_code != requests.codes.ok: _LOGGER.error(log_line) return _LOGGER.info(log_line) result = res.json() or {} DeployHandler._server_instance_changed(result, metrics) @staticmethod def get_deployed_policies(audit): """ Retrieves policies and policy-filters from components that were deployed by deployment-handler """ DeployHandler._lazy_init(audit) with DeployHandler._lock: session = DeployHandler._requests_session target_entity = DeployHandler._target_entity url = DeployHandler._url_policy params = deepcopy(DeployHandler._query) timeout_in_secs = DeployHandler._timeout_in_secs custom_kwargs = deepcopy(DeployHandler._custom_kwargs) metrics = Metrics(aud_parent=audit, targetEntity="{} get_deployed_policies".format(target_entity), targetServiceName=url) headers = metrics.put_request_id_into_headers() log_action = "get from {} at {}".format(target_entity, url) log_data = "headers={}, params={}, timeout_in_secs={}, custom_kwargs({})".format( json.dumps(headers), json.dumps(params), timeout_in_secs, json.dumps(custom_kwargs)) log_line = log_action + " " + log_data _LOGGER.info(log_line) metrics.metrics_start(log_line) if not DeployHandler._url: error_msg = "no url found to {}".format(log_line) _LOGGER.error(error_msg) metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value) metrics.metrics(error_msg) return {"error": "failed to retrieve policies from deployment-handler"}, None, None res = None try: res = session.get(url, headers=headers, params=params, timeout=timeout_in_secs, **custom_kwargs) except Exception as ex: error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value if isinstance(ex, requests.exceptions.RequestException) else AuditHttpCode.SERVER_INTERNAL_ERROR.value) error_msg = "failed to {} {}: {} {}".format( log_action, type(ex).__name__, str(ex), log_data) _LOGGER.exception(metrics.fatal(error_msg)) metrics.set_http_status_code(error_code) audit.set_http_status_code(error_code) metrics.metrics(error_msg) return {"error": "failed to retrieve policies from deployment-handler"}, None, None metrics.set_http_status_code(res.status_code) audit.set_http_status_code(res.status_code) log_line = "response {} from {}: text={} {}".format( res.status_code, log_action, res.text, log_data) metrics.metrics(log_line) if res.status_code != requests.codes.ok: _LOGGER.error(log_line) return {"error": "failed to retrieve policies from deployment-handler"}, None, None result = res.json() or {} DeployHandler._server_instance_changed(result, metrics) policies = result.get(POLICIES, {}) policy_filters = result.get(POLICY_FILTERS, {}) if not policies and not policy_filters: audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_OK.value) _LOGGER.warning(audit.warn( "found no deployed policies or policy-filters: {}".format(log_line), error_code=AuditResponseCode.DATA_ERROR)) return {"warning": "got no deployed policies"}, None, None _LOGGER.info(log_line) return None, policies, policy_filters @staticmethod def _server_instance_changed(result, metrics): """Checks whether the deployment-handler instance changed since last call.""" prev_server_instance_uuid = DeployHandler._server_instance_uuid DeployHandler._server_instance_uuid = result.get("server_instance_uuid") if (prev_server_instance_uuid and prev_server_instance_uuid != DeployHandler._server_instance_uuid): DeployHandler.server_instance_changed = True _LOGGER.info(metrics.info( "deployment_handler_changed: {1} != {0}" .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)))