summaryrefslogtreecommitdiffstats
path: root/dcae-policy/dcaepolicyplugin
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-policy/dcaepolicyplugin')
-rw-r--r--dcae-policy/dcaepolicyplugin/discovery.py51
-rw-r--r--dcae-policy/dcaepolicyplugin/tasks.py41
2 files changed, 73 insertions, 19 deletions
diff --git a/dcae-policy/dcaepolicyplugin/discovery.py b/dcae-policy/dcaepolicyplugin/discovery.py
index 45a061b..1faee08 100644
--- a/dcae-policy/dcaepolicyplugin/discovery.py
+++ b/dcae-policy/dcaepolicyplugin/discovery.py
@@ -18,25 +18,54 @@
"""client to talk to consul on standard port 8500"""
-import requests
+import base64
+import json
+import requests
from cloudify import ctx
# it is safe to assume that consul agent is at localhost:8500 along with cloudify manager
CONSUL_SERVICE_URL = "http://localhost:8500/v1/catalog/service/{0}"
+CONSUL_KV_MASK = "http://localhost:8500/v1/kv/{0}"
+
def discover_service_url(service_name):
"""find the service record in consul"""
- service_url_url = CONSUL_SERVICE_URL.format(service_name)
- ctx.logger.info("getting service_url at {0}".format(service_url_url))
+ service_url = CONSUL_SERVICE_URL.format(service_name)
+ ctx.logger.info("getting service_url at {0}".format(service_url))
+
+ response = requests.get(service_url)
+
+ ctx.logger.info("got {0} for service_url at {1} response: {2}"
+ .format(response.status_code, service_url, response.text))
+
+ if response.status_code != requests.codes.ok:
+ return
+
+ resp_json = response.json()
+ if resp_json:
+ service = resp_json[0]
+ return "http://{0}:{1}".format(service["ServiceAddress"], service["ServicePort"])
+
+
+def discover_value(key):
+ """get the value for the key from consul-kv"""
+ kv_url = CONSUL_KV_MASK.format(key)
+ ctx.logger.info("getting kv at {0}".format(kv_url))
+
+ response = requests.get(kv_url)
- response = requests.get(service_url_url)
+ ctx.logger.info("got {0} for kv at {1} response: {2}"
+ .format(response.status_code, kv_url, response.text))
- ctx.logger.info("got service_url at {0} status({1}) response: {2}"
- .format(service_url_url, response.status_code, response.text))
+ if response.status_code != requests.codes.ok:
+ return
- if response.status_code == requests.codes.ok:
- resp_json = response.json()
- if resp_json:
- service = resp_json[0]
- return "http://{0}:{1}".format(service["ServiceAddress"], service["ServicePort"])
+ data = response.json()
+ if not data:
+ ctx.logger.error("failed discover_value %s", key)
+ return
+ value = base64.b64decode(data[0]["Value"]).decode("utf-8")
+ ctx.logger.info("consul-kv key=%s value(%s) data=%s",
+ key, value, json.dumps(data))
+ return json.loads(value)
diff --git a/dcae-policy/dcaepolicyplugin/tasks.py b/dcae-policy/dcaepolicyplugin/tasks.py
index b3a29aa..bbf3ec1 100644
--- a/dcae-policy/dcaepolicyplugin/tasks.py
+++ b/dcae-policy/dcaepolicyplugin/tasks.py
@@ -18,19 +18,20 @@
"""tasks are the cloudify operations invoked on interfaces defined in the blueprint"""
-import json
-import uuid
import copy
+import json
import traceback
-import requests
+import uuid
+import requests
from cloudify import ctx
-from cloudify.decorators import operation
from cloudify.context import NODE_INSTANCE
+from cloudify.decorators import operation
from cloudify.exceptions import NonRecoverableError
-from .discovery import discover_service_url
+from .discovery import discover_service_url, discover_value
+DCAE_POLICY_PLUGIN = "dcaepolicyplugin"
POLICY_ID = 'policy_id'
POLICY_REQUIRED = 'policy_required'
POLICY_BODY = 'policy_body'
@@ -45,6 +46,7 @@ DCAE_POLICIES_TYPE = 'dcae.nodes.policies'
DCAE_POLICY_TYPES = [DCAE_POLICY_TYPE, DCAE_POLICIES_TYPE]
CONFIG_ATTRIBUTES = "configAttributes"
+
class PolicyHandler(object):
"""talk to policy-handler"""
SERVICE_NAME_POLICY_HANDLER = "policy_handler"
@@ -60,8 +62,27 @@ class PolicyHandler(object):
return
PolicyHandler._url = discover_service_url(PolicyHandler.SERVICE_NAME_POLICY_HANDLER)
- if not PolicyHandler._url:
- PolicyHandler._url = PolicyHandler.DEFAULT_URL
+ if PolicyHandler._url:
+ return
+
+ config = discover_value(DCAE_POLICY_PLUGIN)
+ if config and isinstance(config, dict):
+ # expected structure for the config value for dcaepolicyplugin key
+ # {
+ # "dcaepolicyplugin" : {
+ # "policy_handler" : {
+ # "target_entity" : "policy_handler",
+ # "url" : "http://policy-handler:25577"
+ # }
+ # }
+ # }
+ PolicyHandler._url = config.get(DCAE_POLICY_PLUGIN, {}) \
+ .get(PolicyHandler.SERVICE_NAME_POLICY_HANDLER, {}).get("url")
+
+ if PolicyHandler._url:
+ return
+
+ PolicyHandler._url = PolicyHandler.DEFAULT_URL
@staticmethod
def get_latest_policy(policy_id):
@@ -93,7 +114,7 @@ class PolicyHandler(object):
PolicyHandler.X_ECOMP_REQUESTID: policy_filter.get(REQUEST_ID, str(uuid.uuid4()))
}
- ctx.logger.info("finding the latest polices from {0} by {1} headers={2}".format( \
+ ctx.logger.info("finding the latest polices from {0} by {1} headers={2}".format(
ph_path, json.dumps(policy_filter), json.dumps(headers)))
res = requests.post(ph_path, json=policy_filter, headers=headers)
@@ -106,6 +127,7 @@ class PolicyHandler(object):
res.raise_for_status()
return res.json().get(LATEST_POLICIES)
+
def _policy_get():
"""
dcae.nodes.policy -
@@ -143,6 +165,7 @@ def _policy_get():
ctx.instance.runtime_properties[POLICY_BODY] = policy[POLICY_BODY]
return True
+
def _fix_policy_filter(policy_filter):
if CONFIG_ATTRIBUTES in policy_filter:
config_attributes = policy_filter.get(CONFIG_ATTRIBUTES)
@@ -159,6 +182,7 @@ def _fix_policy_filter(policy_filter):
ctx.logger.warn("unexpected %s: %s", CONFIG_ATTRIBUTES, config_attributes)
del policy_filter[CONFIG_ATTRIBUTES]
+
def _policies_find():
"""
dcae.nodes.policies -
@@ -195,6 +219,7 @@ def _policies_find():
return True
+
#########################################################
@operation
def policy_get(**kwargs):