diff options
author | Alex Shatov <alexs@att.com> | 2018-03-27 17:12:31 -0400 |
---|---|---|
committer | Alex Shatov <alexs@att.com> | 2018-03-27 17:12:31 -0400 |
commit | 768c68b6fa25ab2faa9f7dbffdae2cbb2bd6e218 (patch) | |
tree | cef25ca671633f9e4e2d6b784cd5ad8b1970d0dd /dcae-policy/dcaepolicyplugin | |
parent | 342c2890e9b46b483244773b2337c8f56ae9bd0d (diff) |
2.2.1 dcaepolicyplugin and data types
- trying to avoid changing code for k8s deployment of
policy-handler with unknown url to MSB or policy-handler
at the moment
- expecting optional manual population of the consul-kv
with config data for dcaepolicyplugin
- when not found service for policy-handler in consul
-- try finding config for "dcaepolicyplugin" in consul-kv
-- the config structure is expected to contain
url to policy_handler
- example of config value for key=dcaepolicyplugin:
{
"dcaepolicyplugin" : {
"policy_handler" : {
"url" : "http://policy-handler:25577"
}
}
}
- still drop down to hardcoded default when this config
not found in consul-kv
- added and refactored unit tests for discovery -- coverage 78%
- making code more PEP8 compliant
Change-Id: Ia176b54ed62631baa30d614785d1937023408ddf
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-419
Diffstat (limited to 'dcae-policy/dcaepolicyplugin')
-rw-r--r-- | dcae-policy/dcaepolicyplugin/discovery.py | 51 | ||||
-rw-r--r-- | dcae-policy/dcaepolicyplugin/tasks.py | 41 |
2 files changed, 73 insertions, 19 deletions
diff --git a/dcae-policy/dcaepolicyplugin/discovery.py b/dcae-policy/dcaepolicyplugin/discovery.py index 45a061b..1faee08 100644 --- a/dcae-policy/dcaepolicyplugin/discovery.py +++ b/dcae-policy/dcaepolicyplugin/discovery.py @@ -18,25 +18,54 @@ """client to talk to consul on standard port 8500""" -import requests +import base64 +import json +import requests from cloudify import ctx # it is safe to assume that consul agent is at localhost:8500 along with cloudify manager CONSUL_SERVICE_URL = "http://localhost:8500/v1/catalog/service/{0}" +CONSUL_KV_MASK = "http://localhost:8500/v1/kv/{0}" + def discover_service_url(service_name): """find the service record in consul""" - service_url_url = CONSUL_SERVICE_URL.format(service_name) - ctx.logger.info("getting service_url at {0}".format(service_url_url)) + service_url = CONSUL_SERVICE_URL.format(service_name) + ctx.logger.info("getting service_url at {0}".format(service_url)) + + response = requests.get(service_url) + + ctx.logger.info("got {0} for service_url at {1} response: {2}" + .format(response.status_code, service_url, response.text)) + + if response.status_code != requests.codes.ok: + return + + resp_json = response.json() + if resp_json: + service = resp_json[0] + return "http://{0}:{1}".format(service["ServiceAddress"], service["ServicePort"]) + + +def discover_value(key): + """get the value for the key from consul-kv""" + kv_url = CONSUL_KV_MASK.format(key) + ctx.logger.info("getting kv at {0}".format(kv_url)) + + response = requests.get(kv_url) - response = requests.get(service_url_url) + ctx.logger.info("got {0} for kv at {1} response: {2}" + .format(response.status_code, kv_url, response.text)) - ctx.logger.info("got service_url at {0} status({1}) response: {2}" - .format(service_url_url, response.status_code, response.text)) + if response.status_code != requests.codes.ok: + return - if response.status_code == requests.codes.ok: - resp_json = response.json() - if resp_json: - service = resp_json[0] - return "http://{0}:{1}".format(service["ServiceAddress"], service["ServicePort"]) + data = response.json() + if not data: + ctx.logger.error("failed discover_value %s", key) + return + value = base64.b64decode(data[0]["Value"]).decode("utf-8") + ctx.logger.info("consul-kv key=%s value(%s) data=%s", + key, value, json.dumps(data)) + return json.loads(value) diff --git a/dcae-policy/dcaepolicyplugin/tasks.py b/dcae-policy/dcaepolicyplugin/tasks.py index b3a29aa..bbf3ec1 100644 --- a/dcae-policy/dcaepolicyplugin/tasks.py +++ b/dcae-policy/dcaepolicyplugin/tasks.py @@ -18,19 +18,20 @@ """tasks are the cloudify operations invoked on interfaces defined in the blueprint""" -import json -import uuid import copy +import json import traceback -import requests +import uuid +import requests from cloudify import ctx -from cloudify.decorators import operation from cloudify.context import NODE_INSTANCE +from cloudify.decorators import operation from cloudify.exceptions import NonRecoverableError -from .discovery import discover_service_url +from .discovery import discover_service_url, discover_value +DCAE_POLICY_PLUGIN = "dcaepolicyplugin" POLICY_ID = 'policy_id' POLICY_REQUIRED = 'policy_required' POLICY_BODY = 'policy_body' @@ -45,6 +46,7 @@ DCAE_POLICIES_TYPE = 'dcae.nodes.policies' DCAE_POLICY_TYPES = [DCAE_POLICY_TYPE, DCAE_POLICIES_TYPE] CONFIG_ATTRIBUTES = "configAttributes" + class PolicyHandler(object): """talk to policy-handler""" SERVICE_NAME_POLICY_HANDLER = "policy_handler" @@ -60,8 +62,27 @@ class PolicyHandler(object): return PolicyHandler._url = discover_service_url(PolicyHandler.SERVICE_NAME_POLICY_HANDLER) - if not PolicyHandler._url: - PolicyHandler._url = PolicyHandler.DEFAULT_URL + if PolicyHandler._url: + return + + config = discover_value(DCAE_POLICY_PLUGIN) + if config and isinstance(config, dict): + # expected structure for the config value for dcaepolicyplugin key + # { + # "dcaepolicyplugin" : { + # "policy_handler" : { + # "target_entity" : "policy_handler", + # "url" : "http://policy-handler:25577" + # } + # } + # } + PolicyHandler._url = config.get(DCAE_POLICY_PLUGIN, {}) \ + .get(PolicyHandler.SERVICE_NAME_POLICY_HANDLER, {}).get("url") + + if PolicyHandler._url: + return + + PolicyHandler._url = PolicyHandler.DEFAULT_URL @staticmethod def get_latest_policy(policy_id): @@ -93,7 +114,7 @@ class PolicyHandler(object): PolicyHandler.X_ECOMP_REQUESTID: policy_filter.get(REQUEST_ID, str(uuid.uuid4())) } - ctx.logger.info("finding the latest polices from {0} by {1} headers={2}".format( \ + ctx.logger.info("finding the latest polices from {0} by {1} headers={2}".format( ph_path, json.dumps(policy_filter), json.dumps(headers))) res = requests.post(ph_path, json=policy_filter, headers=headers) @@ -106,6 +127,7 @@ class PolicyHandler(object): res.raise_for_status() return res.json().get(LATEST_POLICIES) + def _policy_get(): """ dcae.nodes.policy - @@ -143,6 +165,7 @@ def _policy_get(): ctx.instance.runtime_properties[POLICY_BODY] = policy[POLICY_BODY] return True + def _fix_policy_filter(policy_filter): if CONFIG_ATTRIBUTES in policy_filter: config_attributes = policy_filter.get(CONFIG_ATTRIBUTES) @@ -159,6 +182,7 @@ def _fix_policy_filter(policy_filter): ctx.logger.warn("unexpected %s: %s", CONFIG_ATTRIBUTES, config_attributes) del policy_filter[CONFIG_ATTRIBUTES] + def _policies_find(): """ dcae.nodes.policies - @@ -195,6 +219,7 @@ def _policies_find(): return True + ######################################################### @operation def policy_get(**kwargs): |