summaryrefslogtreecommitdiffstats
path: root/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib
diff options
context:
space:
mode:
Diffstat (limited to 'onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib')
-rw-r--r--onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/__init__.py2
-rw-r--r--onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py138
-rw-r--r--onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/policies_output.py131
3 files changed, 235 insertions, 36 deletions
diff --git a/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/__init__.py b/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/__init__.py
index 18b1b0c..547b16f 100644
--- a/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/__init__.py
+++ b/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/__init__.py
@@ -19,4 +19,4 @@
"""expose the Policies class on the package level"""
-from .dcae_policy import Policies, POLICIES, POLICY_MESSAGE_TYPE
+from .dcae_policy import Policies
diff --git a/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py b/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py
index e4c5cea..b064157 100644
--- a/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py
+++ b/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/dcae_policy.py
@@ -20,14 +20,15 @@
"""dcae_policy contains decorators for the policy lifecycle in cloudify"""
import json
+import traceback
from copy import deepcopy
from functools import wraps
-import traceback
from cloudify import ctx
from cloudify.context import NODE_INSTANCE
from cloudify.exceptions import NonRecoverableError
+from .policies_output import PoliciesOutput
from .utils import Utils
POLICIES = 'policies'
@@ -47,12 +48,14 @@ POLICY_FILTER = 'policy_filter'
POLICY_FILTER_ID = 'policy_filter_id'
POLICY_TYPE = 'policyType'
POLICY_TYPE_MICROSERVICE = 'MicroService'
-POLICY_CONFIG_MS = "Config_MS"
+POLICY_CONFIG_MS = "Config_MS_"
POLICY_PERSISTENT = 'policy_persistent'
DCAE_POLICY_TYPE = 'dcae.nodes.policy'
DCAE_POLICIES_TYPE = 'dcae.nodes.policies'
-POLICY_MESSAGE_TYPE = 'policy'
+
+ACTION_GATHERED = "gathered"
+ACTION_UPDATED = "updated"
class Policies(object):
"""static class for policy operations"""
@@ -195,15 +198,15 @@ class Policies(object):
"""
policy_defaulted_fields = ctx.instance.runtime_properties.get(POLICY_DEFAULTED_FIELDS, {})
policy_defaulted_fields.update(
- (k, True)
+ (field_name, True)
for policy in Policies._removed_policies.itervalues()
- for k in Policies._get_config_from_policy(policy) or {}
+ for field_name in Policies._get_config_from_policy(policy) or {}
)
if policies:
for policy in policies.itervalues():
- for k in Policies._get_config_from_policy(policy) or {}:
- if k in policy_defaulted_fields:
- del policy_defaulted_fields[k]
+ for field_name in Policies._get_config_from_policy(policy) or {}:
+ if field_name in policy_defaulted_fields:
+ del policy_defaulted_fields[field_name]
ctx.instance.runtime_properties[POLICY_DEFAULTED_FIELDS] = policy_defaulted_fields
@@ -230,13 +233,25 @@ class Policies(object):
Policies._set_policy_apply_order(policies)
@staticmethod
+ def _get_policy_bodies_dict(policies):
+ """returns a dict of policy_id -> policy_body if policy_body exists"""
+ if not policies:
+ return {}
+
+ return dict((policy_id, policy.get(POLICY_BODY))
+ for policy_id, policy in policies.iteritems() if policy.get(POLICY_BODY)
+ )
+
+ @staticmethod
def gather_policies_to_node(policy_apply_order_path=None):
"""
- decorate with @Policies.gather_policies_to_node to
+ decorate with @Policies.gather_policies_to_node() to
gather the policies from dcae.nodes.policy nodes this node depends on.
Places the policies into runtime_properties["policies"].
+ Stores <scn>:policies data in consul-kv
+
Call Policies.calc_latest_application_config() to apply policies onto application_config.
"""
def gather_policies_decorator(func):
@@ -301,6 +316,8 @@ class Policies(object):
if ctx.type != NODE_INSTANCE:
raise NonRecoverableError("can only invoke gather_policies_to_node on node")
+ policies_outputted = False
+ policy_bodies = []
try:
Policies._store_policy_apply_order_clause(policy_apply_order_path)
@@ -313,12 +330,21 @@ class Policies(object):
Policies._set_policies(policies)
if policy_filters:
ctx.instance.runtime_properties[POLICY_FILTERS] = policy_filters
+
+ policy_bodies = Policies._get_policy_bodies_dict(policies)
+ if policy_bodies:
+ policies_outputted = PoliciesOutput.store_policies(ACTION_GATHERED, policy_bodies)
except Exception as ex:
error = "Failed to set the policies {0}".format(str(ex))
ctx.logger.error("{0}: {1}".format(error, traceback.format_exc()))
raise NonRecoverableError(error)
- return func(*args, **kwargs)
+ func_result = func(*args, **kwargs)
+
+ if not policies_outputted and policy_bodies:
+ PoliciesOutput.store_policies(ACTION_GATHERED, policy_bodies)
+
+ return func_result
return wrapper
return gather_policies_decorator
@@ -349,19 +375,17 @@ class Policies(object):
for policy_id in removed_policies:
removed_policy = policies.get(policy_id)
- if removed_policy:
+ if removed_policy and POLICY_BODY in removed_policy:
Policies._removed_policies[policy_id] = deepcopy(removed_policy)
- if not removed_policy.get(POLICY_PERSISTENT):
- del policies[policy_id]
- elif POLICY_BODY in removed_policy:
+ if removed_policy.get(POLICY_PERSISTENT):
del policies[policy_id][POLICY_BODY]
+ else:
+ del policies[policy_id]
- new_policies = dict((p_id, policy)
+ new_policies = dict((policy_id, policy)
for policy_filter_id in policy_filters
- for (p_id, policy) in added_policies
- .get(policy_filter_id, {})
- .get(POLICIES, {})
- .iteritems())
+ for (policy_id, policy) in added_policies.get(policy_filter_id, {})
+ .get(POLICIES, {}).iteritems())
ctx.logger.info("new_policies: {0}".format(json.dumps(new_policies)))
@@ -373,13 +397,17 @@ class Policies(object):
continue
updated_policies.append(policy)
- skipped = {"ignored" : [], "unexpected" : [], "same" : []}
+ skipped = {"ignored": [], "unexpected": [], "same": [], "duplicate": []}
for policy in updated_policies:
policy_id = policy.get(POLICY_ID)
if not policy_id or policy_id not in policies:
skipped["ignored"].append(policy)
continue
+ if policy_id in Policies._updated_policies:
+ skipped["duplicate"].append(policy)
+ continue
+
updated_policy_body = policy.get(POLICY_BODY, {})
updated_policy_version = updated_policy_body.get(POLICY_VERSION)
if not updated_policy_version or POLICY_CONFIG not in updated_policy_body:
@@ -395,20 +423,25 @@ class Policies(object):
policies[policy_id][POLICY_BODY] = updated_policy_body
Policies._updated_policies[policy_id] = policy
- if skipped["same"] or skipped["ignored"] or skipped["unexpected"]:
- ctx.logger.info("skipped policies: {0}".format(json.dumps(skipped)))
+ if skipped["same"] or skipped["ignored"] or skipped["unexpected"] or skipped["duplicate"]:
+ ctx.logger.info("skipped updates on policies: {0}".format(json.dumps(skipped)))
if Policies._updated_policies or Policies._removed_policies:
Policies._set_policies(policies)
+ policy_bodies = Policies._get_policy_bodies_dict(policies)
+ PoliciesOutput.store_policies(ACTION_UPDATED, policy_bodies)
@staticmethod
- def update_policies_on_node(configs_only=True):
- """decorate each policy_update operation with @Policies.update_policies_on_node to
+ def update_policies_on_node(configs_only=False):
+ """
+ decorate each policy_update operation with @Policies.update_policies_on_node to
filter out the updated_policies to only what applies to the current node instance,
update runtime_properties["policies"]
+ updates <scn>:policies data in consul-kv
+
:configs_only: - set to True if expect to see only the config in updated_policies
- instead of the whole policy object (False)
+ instead of the whole policy_body object (False)
Passes through the filtered list of updated_policies that apply to the current node instance
@@ -439,10 +472,16 @@ class Policies(object):
if configs_only:
updated_policies = Utils.remove_empties(
[Policies._get_config_from_policy(policy)
- for policy in updated_policies or []]
+ for policy in updated_policies]
)
removed_policies = [Policies._get_config_from_policy(policy)
- for policy in removed_policies or []]
+ for policy in removed_policies]
+ else:
+ updated_policies = Utils.remove_empties(
+ [policy.get(POLICY_BODY) for policy in updated_policies]
+ )
+ removed_policies = [policy.get(POLICY_BODY)
+ for policy in removed_policies]
except Exception as ex:
error = "Failed to update the policies {0}".format(str(ex))
@@ -450,16 +489,41 @@ class Policies(object):
raise NonRecoverableError(error)
if updated_policies or removed_policies:
- return func(updated_policies, removed_policies=removed_policies, **kwargs)
+ return func(
+ updated_policies,
+ removed_policies=removed_policies,
+ policies=Policies.get_policy_bodies(),
+ **kwargs)
return wrapper
return update_policies_decorator
@staticmethod
- def _get_ordered_policies(selected_policies=None):
+ def cleanup_policies_on_node(func):
"""
- returns the ordered list of selected policies from the runtime policies
- ordered by policy_id values
+ decorate each policy_update operation with @Policies.cleanup_policies_on_node to
+ remove <scn>:policies data in consul-kv
"""
+ if not func:
+ return
+
+ @wraps(func)
+ def wrapper(**kwargs):
+ """cleanup policies in consul-kv"""
+ if ctx.type != NODE_INSTANCE:
+ raise NonRecoverableError("can only invoke cleanup_policies_on_node on node")
+
+ try:
+ PoliciesOutput.delete_policies()
+ except Exception as ex:
+ error = "Failed to cleanup policies in consul-kv {0}".format(str(ex))
+ ctx.logger.error("{0}: {1}".format(error, traceback.format_exc()))
+
+ return func(**kwargs)
+ return wrapper
+
+ @staticmethod
+ def _get_ordered_policies(selected_policies=None):
+ """returns the ordered list of selected policies from the runtime policies"""
policies = ctx.instance.runtime_properties.get(POLICIES)
apply_order = ctx.instance.runtime_properties.get(POLICY_APPLY_ORDER)
if not policies or not apply_order:
@@ -475,10 +539,7 @@ class Policies(object):
@staticmethod
def get_policy_configs():
- """
- returns the list of policy configs from the runtime policies
- ordered by policy_id values
- """
+ """returns the ordered list of policy configs from the runtime policies"""
if ctx.type != NODE_INSTANCE:
return []
@@ -489,6 +550,13 @@ class Policies(object):
)
@staticmethod
+ def get_policy_bodies():
+ """returns the ordered list of policy_body objects if policy_body exists"""
+ return [policy.get(POLICY_BODY)
+ for policy in Policies._get_ordered_policies()
+ if policy.get(POLICY_BODY)]
+
+ @staticmethod
def shallow_merge_policies_into(config, default_config=None):
"""
shallow merge the :policy configs: (dict) into :config: that is expected to be a dict.
diff --git a/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/policies_output.py b/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/policies_output.py
new file mode 100644
index 0000000..7a1a8e8
--- /dev/null
+++ b/onap-dcae-dcaepolicy-lib/onap_dcae_dcaepolicy_lib/policies_output.py
@@ -0,0 +1,131 @@
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+"""client to talk to consul at the standard port 8500 on localhost"""
+
+import base64
+import json
+import urllib
+import uuid
+from datetime import datetime
+
+import requests
+from cloudify import ctx
+
+
+class PoliciesOutput(object):
+ """static class for store-delete policies in consul kv"""
+ # it is safe to assume that consul agent is at localhost:8500 along with cloudify manager
+ CONSUL_TRANSACTION_URL = "http://localhost:8500/v1/txn"
+
+ POLICIES_EVENT = 'policies_event'
+
+ POLICIES_FOLDER_MASK = "{0}:policies/{1}"
+ MAX_OPS_PER_TXN = 64
+ # MAX_VALUE_LEN = 512 * 1000
+
+ OPERATION_SET = "set"
+ OPERATION_DELETE = "delete"
+ OPERATION_DELETE_FOLDER = "delete-tree"
+ SERVICE_COMPONENT_NAME = "service_component_name"
+
+ @staticmethod
+ def _gen_txn_operation(verb, service_component_name, key=None, value=None):
+ """returns the properly formatted operation to be used inside transaction"""
+ key = PoliciesOutput.POLICIES_FOLDER_MASK.format(service_component_name, urllib.quote(key or ""))
+ if value:
+ return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(value)}}
+ return {"KV": {"Verb": verb, "Key": key}}
+
+ @staticmethod
+ def _run_transaction(operation_name, txn):
+ """run a single transaction of several operations at consul /txn"""
+ if not txn:
+ return
+
+ response = None
+ try:
+ response = requests.put(PoliciesOutput.CONSUL_TRANSACTION_URL, json=txn)
+ except requests.exceptions.RequestException as ex:
+ ctx.logger.error("failed to {0} at {1}: {2} on txn={3}"
+ .format(operation_name, PoliciesOutput.CONSUL_TRANSACTION_URL,
+ str(ex), json.dumps(txn)))
+ return
+
+ if response.status_code != requests.codes.ok:
+ ctx.logger.error("failed {0} {1}: {2} text={3} txn={4} headers={5}"
+ .format(operation_name, PoliciesOutput.CONSUL_TRANSACTION_URL, response.status_code,
+ response.text, json.dumps(txn),
+ json.dumps(dict(response.request.headers.items()))))
+ return
+ ctx.logger.info("response for {0} {1}: {2} text={3} txn={4} headers={5}"
+ .format(operation_name, PoliciesOutput.CONSUL_TRANSACTION_URL, response.status_code,
+ response.text, json.dumps(txn),
+ json.dumps(dict(response.request.headers.items()))))
+ return True
+
+ @staticmethod
+ def store_policies(action, policy_bodies):
+ """put the policy_bodies for service_component_name into consul-kv"""
+ service_component_name = ctx.instance.runtime_properties.get(PoliciesOutput.SERVICE_COMPONENT_NAME)
+ if not service_component_name:
+ ctx.logger.warn("failed to find service_component_name to store_policies in consul-kv")
+ return False
+
+ event = {
+ "action": action,
+ "timestamp": (datetime.utcnow().isoformat()[:-3] + 'Z'),
+ "update_id": str(uuid.uuid4()),
+ "policies_count": len(policy_bodies)
+ }
+ ctx.instance.runtime_properties[PoliciesOutput.POLICIES_EVENT] = event
+
+ store_policies = [
+ PoliciesOutput._gen_txn_operation(PoliciesOutput.OPERATION_SET, service_component_name,
+ "items/" + policy_id, json.dumps(policy_body))
+ for policy_id, policy_body in policy_bodies.iteritems()
+ ]
+ txn = [
+ PoliciesOutput._gen_txn_operation(PoliciesOutput.OPERATION_DELETE_FOLDER, service_component_name),
+ PoliciesOutput._gen_txn_operation(PoliciesOutput.OPERATION_SET, service_component_name, "event", json.dumps(event))
+ ]
+ idx_step = PoliciesOutput.MAX_OPS_PER_TXN - len(txn)
+ for idx in xrange(0, len(store_policies), idx_step):
+ txn += store_policies[idx : idx + idx_step]
+ if not PoliciesOutput._run_transaction("store_policies", txn):
+ return False
+ txn = []
+
+ PoliciesOutput._run_transaction("store_policies", txn)
+ return True
+
+ @staticmethod
+ def delete_policies():
+ """delete policies for service_component_name in consul-kv"""
+ if PoliciesOutput.POLICIES_EVENT not in ctx.instance.runtime_properties:
+ return
+
+ service_component_name = ctx.instance.runtime_properties.get(PoliciesOutput.SERVICE_COMPONENT_NAME)
+ if not service_component_name:
+ ctx.logger.warn("failed to find service_component_name to delete_policies in consul-kv")
+ return
+
+ delete_policies = [
+ PoliciesOutput._gen_txn_operation(PoliciesOutput.OPERATION_DELETE_FOLDER, service_component_name)
+ ]
+ PoliciesOutput._run_transaction("delete_policies", delete_policies)