From 90f21051fc4347a84a2c079c994e03abbfec5c62 Mon Sep 17 00:00:00 2001 From: Alex Shatov Date: Wed, 10 Jan 2018 11:39:32 -0500 Subject: variable collection of policies per component * new feature variable collection of policies per component in DCAE * massive refactoring * Unit Test coverage 100% * moved module docstring below the license text Change-Id: I5ba392cb5c42ec136306772163c370d64974ae3c Issue-ID: DCAEGEN2-249 Signed-off-by: Alex Shatov --- .../onap_dcae_dcaepolicy_lib/dcae_policy.py | 556 +++++++++++++++++---- 1 file changed, 452 insertions(+), 104 deletions(-) (limited to 'onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py') diff --git a/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py b/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py index 6a277f3..740f736 100644 --- a/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py +++ b/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py @@ -1,5 +1,3 @@ -"""dcae_policy contains decorators for the policy lifecycle in cloudify""" - # org.onap.dcae # ================================================================================ # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. @@ -19,111 +17,369 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. +"""dcae_policy contains decorators for the policy lifecycle in cloudify""" + import json -import copy +from copy import deepcopy from functools import wraps +import traceback from cloudify import ctx from cloudify.context import NODE_INSTANCE from cloudify.exceptions import NonRecoverableError +from .utils import Utils + POLICIES = 'policies' +POLICY_FILTERS = 'policy_filters' +POLICIES_FILTERED = 'policies_filtered' +POLICY_APPLY_ORDER = 'policy_apply_order' +POLICY_APPLY_ORDER_CLAUSE = 'policy_apply_order_clause' +POLICY_DEFAULTED_FIELDS = 'policy_defaulted_fields' POLICY_ID = 'policy_id' POLICY_BODY = 'policy_body' POLICY_VERSION = "policyVersion" POLICY_CONFIG = 'config' +APPLICATION_CONFIG = 'application_config' +POLICY_FILTER = 'policy_filter' +POLICY_FILTER_ID = 'policy_filter_id' + +POLICY_PERSISTENT = 'policy_persistent' DCAE_POLICY_TYPE = 'dcae.nodes.policy' +DCAE_POLICIES_TYPE = 'dcae.nodes.policies' POLICY_MESSAGE_TYPE = 'policy' class Policies(object): """static class for policy operations""" + _updated_policies = {} + _removed_policies = {} @staticmethod - def gather_policies_to_node(func): - """decorate with @Policies.gather_policies_to_node to - gather the policies from dcae.nodes.policy nodes this node depends on. + def _init(): + """init static members""" + Policies._updated_policies = {} + Policies._removed_policies = {} - Places the policies into runtime_properties["policies"]. + @staticmethod + def _get_config_from_policy(policy): + """returns the config field from policy object""" + return (policy or {}).get(POLICY_BODY, {}).get(POLICY_CONFIG) + + @staticmethod + def _store_policy_apply_order_clause(policy_apply_order_path): + """ + Find the field :policy_apply_order_path: that is provided as an optional parameter + to gather_policies_to_node decorator. - Call Policies.shallow_merge_policies_into(config) to merge the policies into config. + Parse the field-pathes found in the :policy_apply_order_path: field. + + Store the result into :runtime_properties[POLICY_APPLY_ORDER_CLAUSE]: + + Example: + policy_apply_order_path = "docker_config:policy:apply_order" + will take the list of field-pathes from the field + properties: + docker_config: + policy: + apply_order: """ - def _merge_policy_with_node(target): - """get all properties of the policy node and add the actual policy""" - policy = dict(target.node.properties) - if POLICY_BODY in target.instance.runtime_properties: - policy[POLICY_BODY] = target.instance.runtime_properties[POLICY_BODY] - return policy + if not policy_apply_order_path: + return + + policy_apply_order_clause = Utils.get_field_value( + dict(ctx.node.properties), + policy_apply_order_path + ) - if not func: + if not policy_apply_order_clause: + ctx.logger.warn("not found policy_apply_order_path: {0} in node.properties" + .format(policy_apply_order_path)) return - @wraps(func) - def wrapper(*args, **kwargs): - """gather and save the policies from dcae.nodes.policy nodes this node related to""" - try: - if ctx.type == NODE_INSTANCE: - policies = dict([(rel.target.node.properties[POLICY_ID], \ - _merge_policy_with_node(rel.target)) \ - for rel in ctx.instance.relationships \ - if DCAE_POLICY_TYPE in rel.target.node.type_hierarchy \ - and POLICY_ID in rel.target.node.properties \ - and rel.target.node.properties[POLICY_ID] \ - ]) - if policies: - ctx.instance.runtime_properties[POLICIES] = policies - except Exception as ex: - error = "Failed to set the policies {0}".format(str(ex)) - ctx.logger.error(error) - raise NonRecoverableError(error) - - return func(*args, **kwargs) - return wrapper + ctx.instance.runtime_properties[POLICY_APPLY_ORDER_CLAUSE] = policy_apply_order_clause @staticmethod - def _update_policies_on_ctx(updated_policies): - """update policies in runtime_properties and return changed_policies""" - if POLICIES not in ctx.instance.runtime_properties: + def _set_policy_apply_order(policies): + """ + Calculates, sorts and stores the policy_apply_order for policies. + + Sorting is done based on the list of field-pathes + specified in :runtime_properties[POLICY_APPLY_ORDER_CLAUSE]: + + The apply_order field is expected to be formatted as the list of strings. + Each string can contain the path to the field inside the policy_body object + with the same delimiter of ":" (semicolon). + To convert the field to decimal, use ::number suffix after the field name. + To specify the descending order, add "desc" after the whitespace. + + Example: + apply_order = ["matchingConditions:priority::number desc", + "config:foo desc nulls-last", "config:db_client"] + + this will return the policies starting with the highest decimal value in "priority" + field inside the "matchingConditions". + Then the policies with the same "priority" values will be sorted in descending order + by the string value in the field "foo" found inside the "config". + Then the policies with the same "priority" and "foo" values will be sorted + by the string value in the field "db_client" found inside the "config". + Then the policies with the same "priority" and "foo" and "db_client" field values will + further be always sorted by "policy_id" - no need to specify that in apply_order. + Sorting by policy_id insures the uniqueness and predictability of the policy apply_order. + + An invalid field-path will result in the value of None that brings this record upfront. + """ + policy_apply_order = [policy_id + for (policy_id, policy) in policies.iteritems() + if Policies._get_config_from_policy(policy)] + + if not policy_apply_order: + ctx.instance.runtime_properties[POLICY_APPLY_ORDER] = policy_apply_order return - if not updated_policies: - ctx.logger.error("update_policies_on_ctx - no updated_policies provided in arguments") + + policy_apply_order.sort() + + policy_apply_order_clause = ctx.instance.runtime_properties.get(POLICY_APPLY_ORDER_CLAUSE) + if not policy_apply_order_clause: + ctx.instance.runtime_properties[POLICY_APPLY_ORDER] = policy_apply_order + return + + if not isinstance(policy_apply_order_clause, list): + policy_apply_order_clause = [policy_apply_order_clause] + + for clause_item in reversed(policy_apply_order_clause): + f_path, f_type, desc, n_last = Utils.parse_clause_item(clause_item) + + if not f_path: + continue + + policy_apply_order.sort( + key=lambda policy_id, fpath=f_path, ftype=f_type, reverse=desc, nulls_last=n_last: + Utils.key_with_none_in_sort( + reverse, nulls_last, + Utils.get_field_value( + policies.get(policy_id, {}).get(POLICY_BODY, {}), + fpath, + field_type=ftype + ) + ), reverse=desc + ) + + ctx.instance.runtime_properties[POLICY_APPLY_ORDER] = policy_apply_order + + @staticmethod + def _set_policy_defaulted_fields(policies): + """ + Keeps track and stores the dict of field names of removed policies into + :runtime_properties[POLICY_DEFAULTED_FIELDS]: + """ + policy_defaulted_fields = ctx.instance.runtime_properties.get(POLICY_DEFAULTED_FIELDS, {}) + policy_defaulted_fields.update( + (k, True) + for policy in Policies._removed_policies.itervalues() + for k in Policies._get_config_from_policy(policy) or {} + ) + if policies: + for policy in policies.itervalues(): + for k in Policies._get_config_from_policy(policy) or {}: + if k in policy_defaulted_fields: + del policy_defaulted_fields[k] + + ctx.instance.runtime_properties[POLICY_DEFAULTED_FIELDS] = policy_defaulted_fields + + @staticmethod + def _set_policies(policies): + """ + store the :policies: in :runtime_properties[POLICIES]: + + and build an index on policies into :runtime_properties[POLICY_APPLY_ORDER]: + + and keep track of fields from previously :removed: policy-configs in + :runtime_properties[POLICY_DEFAULTED_FIELDS]: to reset them to default values + on merging the policies into config + """ + Policies._set_policy_defaulted_fields(policies) + if not policies: + if POLICIES in ctx.instance.runtime_properties: + del ctx.instance.runtime_properties[POLICIES] + if POLICY_APPLY_ORDER in ctx.instance.runtime_properties: + del ctx.instance.runtime_properties[POLICY_APPLY_ORDER] + return + + ctx.instance.runtime_properties[POLICIES] = policies + Policies._set_policy_apply_order(policies) + + @staticmethod + def gather_policies_to_node(policy_apply_order_path=None): + """ + decorate with @Policies.gather_policies_to_node to + gather the policies from dcae.nodes.policy nodes this node depends on. + + Places the policies into runtime_properties["policies"]. + + Call Policies.calc_latest_application_config() to apply policies onto application_config. + """ + def gather_policies_decorator(func): + """the decorator""" + if not func: + return + + def add_policy(policy_id, policy, policy_persistent, policies): + """only add the latest version of policy to policies""" + prev_policy = policies.get(policy_id) + prev_policy_persistent = (prev_policy or {}).get(POLICY_PERSISTENT) + + if not prev_policy \ + or (policy_persistent and not prev_policy_persistent) \ + or (policy_persistent == prev_policy_persistent and policy.get(POLICY_BODY)): + policy = deepcopy(policy) + policy[POLICY_PERSISTENT] = policy_persistent + policies[policy_id] = policy + + def gather_policy(target, policies): + """adds the policy from dcae.nodes.policy node to policies""" + if DCAE_POLICY_TYPE not in target.node.type_hierarchy: + return + policy_id = target.node.properties.get(POLICY_ID) + if not policy_id: + return True + policy = deepcopy(dict(target.node.properties)) + policy_body = target.instance.runtime_properties.get(POLICY_BODY) + if policy_body: + policy[POLICY_BODY] = policy_body + + add_policy(policy_id, policy, True, policies) + return True + + def gather_policies(target, policies, policy_filters): + """adds the policies and policy-filter from dcae.nodes.policies node to policies""" + if DCAE_POLICIES_TYPE not in target.node.type_hierarchy: + return + + property_policy_filter = target.node.properties.get(POLICY_FILTER) + if property_policy_filter: + policy_filter = dict( + (k, v) for (k, v) in dict(property_policy_filter).iteritems() + if v or isinstance(v, (int, float)) + ) + if policy_filter: + policy_filters[target.instance.id] = { + POLICY_FILTER_ID : target.instance.id, + POLICY_FILTER : deepcopy(policy_filter) + } + + filtered_policies = target.instance.runtime_properties.get(POLICIES_FILTERED) + if not filtered_policies or not isinstance(filtered_policies, dict): + return True + for (policy_id, policy) in filtered_policies.iteritems(): + add_policy(policy_id, policy, False, policies) + return True + + @wraps(func) + def wrapper(*args, **kwargs): + """gather and save the policies from dcae.nodes.policy nodes this node related to""" + if ctx.type != NODE_INSTANCE: + raise NonRecoverableError("can only invoke gather_policies_to_node on node") + + try: + Policies._store_policy_apply_order_clause(policy_apply_order_path) + + policies = {} + policy_filters = {} + for rel in ctx.instance.relationships: + _ = gather_policy(rel.target, policies) \ + or gather_policies(rel.target, policies, policy_filters) + + Policies._set_policies(policies) + if policy_filters: + ctx.instance.runtime_properties[POLICY_FILTERS] = policy_filters + except Exception as ex: + error = "Failed to set the policies {0}".format(str(ex)) + ctx.logger.error("{0}: {1}".format(error, traceback.format_exc())) + raise NonRecoverableError(error) + + return func(*args, **kwargs) + return wrapper + return gather_policies_decorator + + @staticmethod + def _update_policies(updated_policies, added_policies, removed_policies): + """ + filter and update policies in runtime_properties + and return the ordered filtered list of changed_policies + """ + Policies._init() + + if not updated_policies and not removed_policies and not added_policies: + ctx.logger.error( + "update_policies_on_ctx - no updated, added, or removed policies received") return - policies = ctx.instance.runtime_properties[POLICIES] - ctx.logger.info("update_policies_on_ctx: {0}".format(json.dumps(updated_policies))) - changed_policies = [] - ignored_policies = [] - unexpected_policies = [] - same_policies = [] + updated_policies = updated_policies or [] + added_policies = added_policies or {} + removed_policies = removed_policies or [] + + ctx.logger.info("updated_policies: {0}, added_policies: {1}, removed_policies: {2}" + .format(json.dumps(updated_policies), + json.dumps(added_policies), + json.dumps(removed_policies))) + + policies = ctx.instance.runtime_properties.get(POLICIES, {}) + policy_filters = ctx.instance.runtime_properties.get(POLICY_FILTERS, {}) + + for policy_id in removed_policies: + removed_policy = policies.get(policy_id) + if removed_policy: + Policies._removed_policies[policy_id] = deepcopy(removed_policy) + if not removed_policy.get(POLICY_PERSISTENT): + del policies[policy_id] + elif POLICY_BODY in removed_policy: + del policies[policy_id][POLICY_BODY] + + new_policies = dict((p_id, policy) + for policy_filter_id in policy_filters + for (p_id, policy) in added_policies + .get(policy_filter_id, {}) + .get(POLICIES, {}) + .iteritems()) + + ctx.logger.info("new_policies: {0}".format(json.dumps(new_policies))) + + for (policy_id, policy) in new_policies.iteritems(): + deployed_policy = policies.get(policy_id) + if not deployed_policy: + policies[policy_id] = policy + Policies._updated_policies[policy_id] = policy + continue + updated_policies.append(policy) + + skipped = {"ignored" : [], "unexpected" : [], "same" : []} for policy in updated_policies: - if POLICY_ID not in policy or policy[POLICY_ID] not in policies: - ignored_policies.append(policy) + policy_id = policy.get(POLICY_ID) + if not policy_id or policy_id not in policies: + skipped["ignored"].append(policy) continue - if POLICY_BODY not in policy or POLICY_VERSION not in policy[POLICY_BODY] \ - or not policy[POLICY_BODY][POLICY_VERSION]: - unexpected_policies.append(policy) + + updated_policy_body = policy.get(POLICY_BODY, {}) + updated_policy_version = updated_policy_body.get(POLICY_VERSION) + if not updated_policy_version or POLICY_CONFIG not in updated_policy_body: + skipped["unexpected"].append(policy) + continue + + deployed_policy = policies.get(policy_id) + deployed_policy_version = deployed_policy.get(POLICY_BODY, {}).get(POLICY_VERSION) + if updated_policy_version == deployed_policy_version: + skipped["same"].append(policy) continue - deployed_policy = policies[policy[POLICY_ID]].get(POLICY_BODY, {}) - new_policy_body = policy[POLICY_BODY] - if not deployed_policy or POLICY_VERSION not in deployed_policy \ - or not deployed_policy[POLICY_VERSION] \ - or deployed_policy[POLICY_VERSION] != new_policy_body[POLICY_VERSION]: - policies[policy[POLICY_ID]][POLICY_BODY] = new_policy_body - changed_policies.append(dict(policies[policy[POLICY_ID]])) - else: - same_policies.append(policy) - - if same_policies: - ctx.logger.info("same policies: {0}".format(json.dumps(same_policies))) - if ignored_policies: - ctx.logger.info("ignored policies: {0}".format(json.dumps(ignored_policies))) - if unexpected_policies: - ctx.logger.warn("unexpected policies: {0}".format(json.dumps(unexpected_policies))) - - if changed_policies: - ctx.instance.runtime_properties[POLICIES] = policies - return changed_policies + policies[policy_id][POLICY_BODY] = updated_policy_body + Policies._updated_policies[policy_id] = policy + + if skipped["same"] or skipped["ignored"] or skipped["unexpected"]: + ctx.logger.info("skipped policies: {0}".format(json.dumps(skipped))) + + if Policies._updated_policies or Policies._removed_policies: + Policies._set_policies(policies) @staticmethod def update_policies_on_node(configs_only=True): @@ -144,56 +400,148 @@ class Policies(object): return @wraps(func) - def wrapper(updated_policies, **kwargs): - """update matching policies on context""" + def wrapper(updated_policies=None, + added_policies=None, + removed_policies=None, + **kwargs): + """update matching policies on the node""" if ctx.type != NODE_INSTANCE: - return + raise NonRecoverableError("can only invoke update_policies_on_node on node") + + try: + Policies._update_policies(updated_policies, added_policies, removed_policies) + + updated_policies = deepcopy(Policies._get_ordered_policies( + selected_policies=Policies._updated_policies + )) + removed_policies = deepcopy(Policies._removed_policies.values()) - updated_policies = Policies._update_policies_on_ctx(updated_policies) - if updated_policies: if configs_only: - updated_policies = [policy[POLICY_BODY][POLICY_CONFIG] \ - for policy in updated_policies \ - if POLICY_BODY in policy \ - and POLICY_CONFIG in policy[POLICY_BODY] \ - ] - return func(updated_policies, **kwargs) + updated_policies = Utils.remove_empties( + [Policies._get_config_from_policy(policy) + for policy in updated_policies or []] + ) + removed_policies = [Policies._get_config_from_policy(policy) + for policy in removed_policies or []] + + except Exception as ex: + error = "Failed to update the policies {0}".format(str(ex)) + ctx.logger.error("{0}: {1}".format(error, traceback.format_exc())) + raise NonRecoverableError(error) + + if updated_policies or removed_policies: + return func(updated_policies, removed_policies=removed_policies, **kwargs) return wrapper return update_policies_decorator + @staticmethod + def _get_ordered_policies(selected_policies=None): + """ + returns the ordered list of selected policies from the runtime policies + ordered by policy_id values + """ + policies = ctx.instance.runtime_properties.get(POLICIES) + apply_order = ctx.instance.runtime_properties.get(POLICY_APPLY_ORDER) + if not policies or not apply_order: + return [] + + if selected_policies is None: + return [policies[policy_id] for policy_id in apply_order] + + if not selected_policies: + return [] + + return [policies[policy_id] for policy_id in apply_order if policy_id in selected_policies] @staticmethod def get_policy_configs(): - """returns the list of policy configs from the runtime policies""" - if ctx.type != NODE_INSTANCE \ - or POLICIES not in ctx.instance.runtime_properties: - return - policies = ctx.instance.runtime_properties[POLICIES] - if not policies: - return - policy_configs = [policies[policy_id][POLICY_BODY][POLICY_CONFIG] \ - for policy_id in policies \ - if POLICY_BODY in policies[policy_id] \ - and POLICY_CONFIG in policies[policy_id][POLICY_BODY] \ - ] - return policy_configs + """ + returns the list of policy configs from the runtime policies + ordered by policy_id values + """ + if ctx.type != NODE_INSTANCE: + return [] + + ordered_policies = Policies._get_ordered_policies() + return Utils.remove_empties( + [Policies._get_config_from_policy(policy) + for policy in ordered_policies] + ) @staticmethod - def shallow_merge_policies_into(config): - """shallow merge the policy configs (dict) into config that is expected to be a dict""" + def shallow_merge_policies_into(config, default_config=None): + """ + shallow merge the :policy configs: (dict) into :config: that is expected to be a dict. + + the fields listed in :runtime_properties[POLICY_DEFAULTED_FIELDS]: + that where ever changed by policy-configs are initially reset to default values + found in :default_config: or :node.properties[APPLICATION_CONFIG]: + on merging the policies into config + """ if config is None: config = {} + policy_configs = Policies.get_policy_configs() - if not policy_configs or not isinstance(config, dict): + + if not config or not isinstance(config, dict): + ctx.logger.warn("unexpected config {0} to merge the policy_configs {1} into" \ + .format(json.dumps(config), json.dumps(policy_configs or []))) + return config + + defaulted_fields = ctx.instance.runtime_properties.get(POLICY_DEFAULTED_FIELDS) + if defaulted_fields: + if default_config is None or not isinstance(default_config, dict): + default_config = dict(ctx.node.properties.get(APPLICATION_CONFIG, {})) + ctx.logger.info("using default_config from node.properties[{0}]: {1}" + .format(APPLICATION_CONFIG, json.dumps(default_config))) + if default_config and isinstance(default_config, dict): + for defaulted_field in defaulted_fields: + if defaulted_field in default_config and defaulted_field in config: + config[defaulted_field] = deepcopy(default_config.get(defaulted_field)) + + ctx.logger.info("inited config {0} on {1} {2} from {3}" + .format(json.dumps(config), + POLICY_DEFAULTED_FIELDS, + json.dumps(defaulted_fields), + json.dumps(default_config))) + + if not policy_configs: + ctx.logger.warn("no policies to merge to config {0}".format(json.dumps(config))) return config - for policy_config in copy.deepcopy(policy_configs): + for policy_config in policy_configs: if not isinstance(policy_config, dict): + ctx.logger.warn("skipped unexpected format of policy_config {0} for config: {1}" \ + .format(json.dumps(policy_config), json.dumps(config))) continue - config.update(policy_config) - for cfg_item in policy_config: - if policy_config[cfg_item] is None: - config.pop(cfg_item, None) + ctx.logger.info("applying policy_config {0} to config {1}" \ + .format(json.dumps(policy_config), json.dumps(config))) + for (policy_key, policy_value) in policy_config.iteritems(): + if policy_key not in config or policy_value is None: + ctx.logger.warn("skipped unexpected policy({0}, {1}) for config: {2}" \ + .format(policy_key, json.dumps(policy_value), json.dumps(config))) + continue + config[policy_key] = deepcopy(policy_value) return config + + @staticmethod + def calc_latest_application_config(application_config_name=None): + """ + shallow merge the policy configs (dict) into config that is expected to be a dict + + if :application_config_name: is not provided, + the runtime property :application_config: on the node instance is used as initial config + """ + if not application_config_name: + application_config_name = APPLICATION_CONFIG + + config = deepcopy(dict(ctx.instance.runtime_properties.get(application_config_name, {}))) + if not config: + config = deepcopy(dict(ctx.node.properties.get(application_config_name, {}))) + + ctx.logger.info("going to merge policies over {0}: {1}" \ + .format(application_config_name, json.dumps(config))) + + return Policies.shallow_merge_policies_into(config) -- cgit 1.2.3-korg