summaryrefslogtreecommitdiffstats
path: root/policyhandler/deploy_handler.py
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2018-08-07 12:11:35 -0400
committerAlex Shatov <alexs@att.com>2018-08-07 12:11:35 -0400
commitd7f34d4b71ec4d86547628cda351d20bff4d017f (patch)
tree101c7669fb5508a103894e262964da0d0c8319bc /policyhandler/deploy_handler.py
parenta29f70823b18f492417629f56c86f61f94b96af8 (diff)
4.0.0 new dataflow on policy-update and catchup
- changed API and functionality - new dataflow - new dataflow between policy-handler and deployment-handler on policy-update and catchup = GETting policy_ids+versions and policy-filters from deployment-handler = PUTting policy-update and catchup in the new message format = data segmenting the policy-update/catchup messages to deployment-handler to avoid 413 on deployment-handler side = matching policies from policy-engine to policies and policy-filters from deployment-handler = coarsening the policyName filter received from deployment-handler to reduce the number messages passed to policy-engine on catchup = consolidating sequential policy-updates into a single request when the policy-update is busy - removed policy scope-prefixes from config and logic - it is not needed anymore because = the policy matching happens directly to policies and policy-filters received from deployment-handler = on catchup - the policy scope-prefix equivalents are calculated based on the data received from deployment-handler - API - GET /policies_latest now returns the info on deployed policy_ids+versions and policy-filters, rather than policies of the scope-prefixes previously found in config (obsolete) - not sending an empty catch_up message to deployment-handler when nothing changed - send policy-removed to deployment-handler when getting 404-not found from PDP on removal of policy - config change: removed catch_up.max_skips - obsolete - brought the latest CommonLogger.py - minor refactoring - improved naming of variables Change-Id: I36b3412eefd439088cb693703a6e5f18f4238b00 Signed-off-by: Alex Shatov <alexs@att.com> Issue-ID: DCAEGEN2-492
Diffstat (limited to 'policyhandler/deploy_handler.py')
-rw-r--r--policyhandler/deploy_handler.py266
1 files changed, 241 insertions, 25 deletions
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index ea703f4..6b7788c 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -16,32 +16,144 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-""" send notification to deploy-handler"""
+"""send policy-update notification to deployment-handler"""
import json
import logging
+from copy import copy, deepcopy
import requests
from .config import Config
from .customize import CustomizerUser
from .discovery import DiscoveryClient
-from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics
+from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
+ AuditResponseCode, Metrics)
+from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES,
+ POLICY_FILTER_MATCHES, POLICY_FILTERS,
+ REMOVED_POLICIES)
+
+
+class PolicyUpdateMessage(object):
+ """class for messages to deployment-handler on policy-update"""
+ BYTES_IN_MB = 1 << 2 * 10
+
+ def __init__(self, latest_policies=None,
+ removed_policies=None, policy_filter_matches=None, catch_up=True):
+ """init"""
+ self._catch_up = catch_up
+ self._latest_policies = deepcopy(latest_policies or {})
+ self._removed_policies = copy(removed_policies or {})
+ self._policy_filter_matches = deepcopy(policy_filter_matches or {})
+
+ self._message = {
+ CATCH_UP: self._catch_up,
+ LATEST_POLICIES: self._latest_policies,
+ REMOVED_POLICIES: self._removed_policies,
+ POLICY_FILTER_MATCHES: self._policy_filter_matches
+ }
+ self.msg_length = 0
+ self._calc_stats()
+
+ def _calc_stats(self):
+ """generate the message and calc stats"""
+ self.msg_length = len(json.dumps(self._message))
+
+ def empty(self):
+ """checks whether have any data"""
+ return (not self._latest_policies
+ and not self._removed_policies
+ and not self._policy_filter_matches)
+
+ def add(self, policy_id, latest_policy=None, policy_filter_ids=None, removed_policy=None):
+ """add the parts from the other message to the current message"""
+ if not policy_id or not (latest_policy or policy_filter_ids or removed_policy):
+ return
+
+ if latest_policy:
+ self._latest_policies[policy_id] = deepcopy(latest_policy)
+
+ if policy_filter_ids:
+ if policy_id not in self._policy_filter_matches:
+ self._policy_filter_matches[policy_id] = {}
+ self._policy_filter_matches[policy_id].update(policy_filter_ids)
+
+ if removed_policy is not None:
+ self._removed_policies[policy_id] = removed_policy
+
+ self._calc_stats()
+
+ def get_message(self):
+ """expose the copy of the message"""
+ return deepcopy(self._message)
+
+ def __str__(self):
+ """to string"""
+ return json.dumps(self._message)
+
+ def _iter_over_removed_policies(self):
+ """generator of iterator over removed_policies"""
+ for (policy_id, value) in self._removed_policies.items():
+ yield (policy_id, value)
+
+ def _iter_over_latest_policies(self):
+ """generator of iterator over latest_policies and policy_filter_matches"""
+ for (policy_id, policy) in self._latest_policies.items():
+ yield (policy_id, policy, self._policy_filter_matches.get(policy_id))
+
+ def gen_segmented_messages(self, max_msg_length_mb):
+ """
+ Break the policy-update message into a list of segmented messages.
+
+ Each segmented message should not exceed the max_msg_length_mb from config.
+ """
+ max_msg_length_mb = (max_msg_length_mb or 10) * PolicyUpdateMessage.BYTES_IN_MB
+
+ messages = []
+ curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
+
+ for (policy_id, value) in self._iter_over_removed_policies():
+ if (not curr_message.empty()
+ and (len(policy_id) + len(str(value)) + curr_message.msg_length
+ > max_msg_length_mb)):
+ messages.append(curr_message.get_message())
+ curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
+ curr_message.add(policy_id, removed_policy=value)
+
+ for (policy_id, policy, policy_filter_ids) in self._iter_over_latest_policies():
+ if (not curr_message.empty()
+ and (2 * len(policy_id) + len(json.dumps(policy))
+ + len(json.dumps(policy_filter_ids))
+ + curr_message.msg_length > max_msg_length_mb)):
+ messages.append(curr_message.get_message())
+ curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
+ curr_message.add(policy_id, latest_policy=policy, policy_filter_ids=policy_filter_ids)
+
+ if not curr_message.empty():
+ messages.append(curr_message.get_message())
+
+ msg_count = len(messages)
+ if msg_count > 1:
+ msg_count = "/" + str(msg_count)
+ for idx, msg in enumerate(messages):
+ msg["data_segment"] = str((idx+1)) + msg_count
+
+ return messages
-POOL_SIZE = 1
class DeployHandler(object):
- """ deploy-handler """
+ """calling the deployment-handler web apis"""
_logger = logging.getLogger("policy_handler.deploy_handler")
_lazy_inited = False
_requests_session = None
- _config = None
_url = None
_url_policy = None
+ _max_msg_length_mb = 10
_target_entity = None
_custom_kwargs = None
_server_instance_uuid = None
+ server_instance_changed = False
@staticmethod
def _lazy_init(audit, rediscover=False):
@@ -56,14 +168,15 @@ class DeployHandler(object):
DeployHandler._custom_kwargs = {}
if not DeployHandler._requests_session:
+ pool_size = Config.settings.get("pool_connections", 20)
DeployHandler._requests_session = requests.Session()
DeployHandler._requests_session.mount(
'https://',
- requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+ requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
DeployHandler._requests_session.mount(
'http://',
- requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+ requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
config_dh = Config.settings.get("deploy_handler")
@@ -72,10 +185,13 @@ class DeployHandler(object):
# config for policy-handler >= 2.4.0
# "deploy_handler" : {
# "target_entity" : "deployment_handler",
- # "url" : "http://deployment_handler:8188"
+ # "url" : "http://deployment_handler:8188",
+ # "max_msg_length_mb" : 100
# }
DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler")
DeployHandler._url = config_dh.get("url")
+ DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb",
+ DeployHandler._max_msg_length_mb)
DeployHandler._logger.info("dns based routing to %s: url(%s)",
DeployHandler._target_entity, DeployHandler._url)
@@ -96,26 +212,52 @@ class DeployHandler(object):
@staticmethod
- def policy_update(audit, message, rediscover=False):
+ def policy_update(audit, policy_update_message, rediscover=False):
"""
- post policy_updated message to deploy-handler
+ segments the big policy_update_message limited by size
+ and sequatially sends each segment as put to deployment-handler at /policy.
- returns condition whether it needs to catch_up
+ param policy_update_message is of PolicyUpdateMessage type
"""
- if not message:
+ if not policy_update_message or policy_update_message.empty():
return
DeployHandler._lazy_init(audit, rediscover)
+
+ str_metrics = "policy_update {0}".format(str(policy_update_message))
+
+ metrics_total = Metrics(
+ aud_parent=audit,
+ targetEntity="{0} total policy_update".format(DeployHandler._target_entity),
+ targetServiceName=DeployHandler._url_policy)
+
+ metrics_total.metrics_start("started {}".format(str_metrics))
+ messages = policy_update_message.gen_segmented_messages(DeployHandler._max_msg_length_mb)
+ for message in messages:
+ DeployHandler._policy_update(audit, message)
+ if not audit.is_success():
+ break
+ metrics_total.metrics("done {}".format(str_metrics))
+
+ @staticmethod
+ def _policy_update(audit, message):
+ """
+ sends the put message to deployment-handler at /policy
+
+ detects whether server_instance_changed condition on deployment-handler
+ that is the cause to catch_up
+ """
+ if not message:
+ return
+
metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
targetServiceName=DeployHandler._url_policy)
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
- msg_str = json.dumps(message)
- headers_str = json.dumps(headers)
-
- log_action = "post to {0} at {1}".format(
+ log_action = "put to {0} at {1}".format(
DeployHandler._target_entity, DeployHandler._url_policy)
- log_data = " msg={0} headers={1}".format(msg_str, headers_str)
+ log_data = " msg={0} headers={1}".format(json.dumps(message),
+ json.dumps(headers))
log_line = log_action + log_data
DeployHandler._logger.info(log_line)
metrics.metrics_start(log_line)
@@ -130,7 +272,7 @@ class DeployHandler(object):
res = None
try:
- res = DeployHandler._requests_session.post(
+ res = DeployHandler._requests_session.put(
DeployHandler._url_policy, json=message, headers=headers,
**DeployHandler._custom_kwargs
)
@@ -149,8 +291,8 @@ class DeployHandler(object):
metrics.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
- log_line = "response {0} from {1}: text={2}{3}" \
- .format(res.status_code, log_action, res.text, log_data)
+ log_line = "response {0} from {1}: text={2}{3}".format(res.status_code, log_action,
+ res.text, log_data)
metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
@@ -159,15 +301,89 @@ class DeployHandler(object):
DeployHandler._logger.info(log_line)
result = res.json() or {}
+ DeployHandler._server_instance_changed(result, metrics)
+
+
+ @staticmethod
+ def get_deployed_policies(audit, rediscover=False):
+ """
+ Retrieves policies and policy-filters from components
+ that were deployed by deployment-handler
+ """
+ DeployHandler._lazy_init(audit, rediscover)
+ metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
+ targetServiceName=DeployHandler._url_policy)
+ headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
+
+ log_action = "get {0}: {1}".format(DeployHandler._target_entity, DeployHandler._url_policy)
+ log_data = " headers={}".format(json.dumps(headers))
+ log_line = log_action + log_data
+ DeployHandler._logger.info(log_line)
+ metrics.metrics_start(log_line)
+
+ if not DeployHandler._url:
+ error_msg = "no url found to {0}".format(log_line)
+ DeployHandler._logger.error(error_msg)
+ metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+ metrics.metrics(error_msg)
+ return None, None
+
+ res = None
+ try:
+ res = DeployHandler._requests_session.get(
+ DeployHandler._url_policy, headers=headers,
+ **DeployHandler._custom_kwargs
+ )
+ except Exception as ex:
+ error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+ if isinstance(ex, requests.exceptions.RequestException)
+ else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+ error_msg = ("failed to {0} {1}: {2}{3}"
+ .format(log_action, type(ex).__name__, str(ex), log_data))
+ DeployHandler._logger.exception(error_msg)
+ metrics.set_http_status_code(error_code)
+ audit.set_http_status_code(error_code)
+ metrics.metrics(error_msg)
+ return None, None
+
+ metrics.set_http_status_code(res.status_code)
+ audit.set_http_status_code(res.status_code)
+
+ log_line = ("response {0} from {1}: text={2}{3}"
+ .format(res.status_code, log_action, res.text, log_data))
+ metrics.metrics(log_line)
+
+ if res.status_code != requests.codes.ok:
+ DeployHandler._logger.error(log_line)
+ return None, None
+
+ result = res.json() or {}
+ DeployHandler._server_instance_changed(result, metrics)
+
+ policies = result.get(POLICIES, {})
+ policy_filters = result.get(POLICY_FILTERS, {})
+ if not policies and not policy_filters:
+ audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+ DeployHandler._logger.warning(audit.warn(
+ "found no deployed policies or policy-filters: {}".format(log_line),
+ error_code=AuditResponseCode.DATA_ERROR))
+ return policies, policy_filters
+
+ DeployHandler._logger.info(log_line)
+ return policies, policy_filters
+
+ @staticmethod
+ def _server_instance_changed(result, metrics):
+ """Checks whether the deployment-handler instance changed since last call."""
prev_server_instance_uuid = DeployHandler._server_instance_uuid
DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
- deployment_handler_changed = (prev_server_instance_uuid
- and prev_server_instance_uuid != DeployHandler._server_instance_uuid)
- if deployment_handler_changed:
+ if (prev_server_instance_uuid
+ and prev_server_instance_uuid != DeployHandler._server_instance_uuid):
+ DeployHandler.server_instance_changed = True
+
log_line = ("deployment_handler_changed: {1} != {0}"
.format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))
metrics.info(log_line)
DeployHandler._logger.info(log_line)
-
- return deployment_handler_changed