summaryrefslogtreecommitdiffstats
path: root/cdap/cdapplugin/cdapcloudify
diff options
context:
space:
mode:
Diffstat (limited to 'cdap/cdapplugin/cdapcloudify')
-rw-r--r--cdap/cdapplugin/cdapcloudify/cdap_plugin.py167
-rw-r--r--cdap/cdapplugin/cdapcloudify/discovery.py38
2 files changed, 110 insertions, 95 deletions
diff --git a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
index 00f80e2..d12a23b 100644
--- a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
+++ b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
@@ -53,11 +53,11 @@ def _trigger_update(updated_policies):
for p in updated_policies:
ctx.logger.info("Reconfiguring CDAP application via smart interface")
return discovery.reconfigure_in_broker(
- cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- config = p,
- reconfiguration_type = "program-flowlet-smart",
- logger = ctx.logger)
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = p,
+ reconfiguration_type = "program-flowlet-smart",
+ logger = ctx.logger)
def _validate_conns(connections):
"""
@@ -111,13 +111,25 @@ def _services_calls_iterator(services_calls):
return for_config
######################
+# Decorators
+######################
+def try_raise_nonr(func):
+ def inner(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception as e:
+ raise NonRecoverableError(e)
+ return inner
+
+######################
# Cloudify Operations
######################
@operation
+@try_raise_nonr
def create(connected_broker_dns_name, **kwargs):
"""
- This is apparantly needed due to the order in which Cloudify relationships are handled in Cloudify.
+ setup critical runtime properties
"""
#fail fast
@@ -142,53 +154,49 @@ def create(connected_broker_dns_name, **kwargs):
ctx.instance.runtime_properties[SER_C] = _services_calls_iterator(ctx.node.properties["connections"][SERVICES_CALLS])
@operation
+@try_raise_nonr
@Policies.gather_policies_to_node
def deploy_and_start_application(**kwargs):
"""
pushes the application into the workspace and starts it
"""
- try:
- #parse TOSCA model params
- config_template = ctx.node.properties["app_config"]
-
- #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here
- #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work
- #the rest of the CDAP app's app_config is app-dependent
- config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C]
- config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C]
- config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C]
+ #parse TOSCA model params
+ config_template = ctx.node.properties["app_config"]
- #register with broker
- ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template)))
- response = discovery.put_broker(
- cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- namespace = ctx.node.properties["namespace"],
- streamname = ctx.node.properties["streamname"],
- jar_url = ctx.node.properties["jar_url"],
- artifact_name = ctx.node.properties["artifact_name"],
- artifact_version = ctx.node.properties["artifact_version"],
- app_config = config_template,
- app_preferences = ctx.node.properties["app_preferences"],
- service_endpoints = ctx.node.properties["service_endpoints"],
- programs = ctx.node.properties["programs"],
- program_preferences = ctx.node.properties["program_preferences"],
- logger = ctx.logger)
+ #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here
+ #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work
+ #the rest of the CDAP app's app_config is app-dependent
+ config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C]
+ config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C]
+ config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C]
- response.raise_for_status() #bomb if not 2xx
+ #register with broker
+ ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template)))
+ response = discovery.put_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ namespace = ctx.node.properties["namespace"],
+ streamname = ctx.node.properties["streamname"],
+ jar_url = ctx.node.properties["jar_url"],
+ artifact_name = ctx.node.properties["artifact_name"],
+ artifact_version = ctx.node.properties["artifact_version"],
+ app_config = config_template,
+ app_preferences = ctx.node.properties["app_preferences"],
+ service_endpoints = ctx.node.properties["service_endpoints"],
+ programs = ctx.node.properties["programs"],
+ program_preferences = ctx.node.properties["program_preferences"],
+ logger = ctx.logger)
- #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP
- #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure
- #get policies that may have changed prior to this blueprint deployment
- policy_configs = Policies.get_policy_configs()
- if policy_configs is not None:
- ctx.logger.info("Updated policy configs: {0}".format(policy_configs))
- response = _trigger_update(policy_configs)
- response.raise_for_status() #bomb if not 2xx
+ response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- ctx.logger.error("Error depploying CDAP app: {er}".format(er=e))
- raise NonRecoverableError(e)
+ #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP
+ #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure
+ #get policies that may have changed prior to this blueprint deployment
+ policy_configs = Policies.get_policy_configs()
+ if policy_configs is not None:
+ ctx.logger.info("Updated policy configs: {0}".format(policy_configs))
+ response = _trigger_update(policy_configs)
+ response.raise_for_status() #bomb if not 2xx
@operation
def stop_and_undeploy_application(**kwargs):
@@ -196,9 +204,10 @@ def stop_and_undeploy_application(**kwargs):
#bombing would also bomb the deletion of the rest of the blueprint
ctx.logger.info("Undeploying CDAP application")
try: #deregister with the broker, which will also take down the service from consul
- response = discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER],
- ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- ctx.logger)
+ response = discovery.delete_on_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ logger = ctx.logger)
response.raise_for_status() #bomb if not 2xx
except Exception as e:
ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e))
@@ -212,7 +221,9 @@ def delete_all_registered_apps(connected_broker_dns_name, **kwargs):
"""
ctx.logger.info("Undeploying CDAP application")
try:
- response = discovery.delete_all_registered_apps(connected_broker_dns_name, ctx.logger)
+ response = discovery.delete_all_registered_apps(
+ cdap_broker_name = connected_broker_dns_name,
+ logger = ctx.logger)
response.raise_for_status() #bomb if not 2xx
except Exception as e:
ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e))
@@ -225,59 +236,53 @@ def delete_all_registered_apps(connected_broker_dns_name, **kwargs):
# 3) broker deals with the rest
############
@operation
+@try_raise_nonr
def app_config_reconfigure(new_config_template, **kwargs):
"""
reconfigure the CDAP app's app config
"""
- try:
- ctx.logger.info("Reconfiguring CDAP application via app_config")
- response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- config = new_config_template, #This keyname will likely change per policy handler
- reconfiguration_type = "program-flowlet-app-config",
- logger = ctx.logger)
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ ctx.logger.info("Reconfiguring CDAP application via app_config")
+ response = discovery.reconfigure_in_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = new_config_template, #This keyname will likely change per policy handler
+ reconfiguration_type = "program-flowlet-app-config",
+ logger = ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
@operation
+@try_raise_nonr
def app_preferences_reconfigure(new_config_template, **kwargs):
"""
reconfigure the CDAP app's app preferences
"""
- try:
- ctx.logger.info("Reconfiguring CDAP application via app_preferences")
- response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- config = new_config_template, #This keyname will likely change per policy handler
- reconfiguration_type = "program-flowlet-app-preferences",
- logger = ctx.logger)
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ ctx.logger.info("Reconfiguring CDAP application via app_preferences")
+ response = discovery.reconfigure_in_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = new_config_template, #This keyname will likely change per policy handler
+ reconfiguration_type = "program-flowlet-app-preferences",
+ logger = ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
@operation
+@try_raise_nonr
def app_smart_reconfigure(new_config_template, **kwargs):
"""
reconfigure the CDAP app via the broker smart interface
"""
- try:
- ctx.logger.info("Reconfiguring CDAP application via smart interface")
- response = _trigger_update([new_config_template])
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ ctx.logger.info("Reconfiguring CDAP application via smart interface")
+ response = _trigger_update([new_config_template])
+ response.raise_for_status() #bomb if not 2xx
@operation
+@try_raise_nonr
@Policies.update_policies_on_node(configs_only=True)
def policy_update(updated_policies, **kwargs):
#its already develiered through policy
ctx.logger.info("Policy update recieved. updated policies: {0}".format(updated_policies))
- try:
- #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here.
- #However all use cases today are a single policy so OK with this for loop for now.
- response = _trigger_update(updated_policies)
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here.
+ #However all use cases today are a single policy so OK with this for loop for now.
+ response = _trigger_update(updated_policies)
+ response.raise_for_status() #bomb if not 2xx
diff --git a/cdap/cdapplugin/cdapcloudify/discovery.py b/cdap/cdapplugin/cdapcloudify/discovery.py
index 12daebc..e20258f 100644
--- a/cdap/cdapplugin/cdapcloudify/discovery.py
+++ b/cdap/cdapplugin/cdapcloudify/discovery.py
@@ -44,8 +44,25 @@ def _get_broker_url(cdap_broker_name, service_component_name, logger):
return broker_url
"""
+decorators
+"""
+def run_response(func):
+ """
+ decorator for generic http call, log the response, and return the flask response
+
+ make sure you call the functons below using logger as a kwarg!
+ """
+ def inner(*args, **kwargs):
+ logger = kwargs["logger"]
+ response = func(*args, **kwargs)
+ logger.info((response, response.status_code, response.text))
+ return response #let the caller deal with the response
+ return inner
+
+"""
public
"""
+@run_response
def put_broker(cdap_broker_name,
service_component_name,
namespace,
@@ -77,13 +94,11 @@ def put_broker(cdap_broker_name,
data["program_preferences"] = program_preferences
#register with the broker
- response = requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger),
+ return requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger),
json = data,
headers = {'content-type':'application/json'})
- logger.info((response, response.status_code, response.text))
-
- return response #let the caller deal with the response
+@run_response
def reconfigure_in_broker(cdap_broker_name,
service_component_name,
config,
@@ -91,20 +106,17 @@ def reconfigure_in_broker(cdap_broker_name,
logger):
#trigger a reconfiguration with the broker
#man am I glad I broke the broker API from 3 to 4 to standardize this interface because now I only need one function here
- response = requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)),
+ return requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)),
headers = {'content-type':'application/json'},
json = {"reconfiguration_type" : reconfiguration_type,
"config" : config})
- logger.info((response, response.status_code, response.text))
-
- return response #let the caller deal with the response
+@run_response
def delete_on_broker(cdap_broker_name, service_component_name, logger):
#deregister with the broker
- response = requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger))
- logger.info((response, response.status_code, response.text))
- return response
+ return requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger))
+@run_response
def delete_all_registered_apps(cdap_broker_name, logger):
#get the broker connection
broker_ip, broker_port = _get_connection_info_from_consul(cdap_broker_name, logger)
@@ -114,9 +126,7 @@ def delete_all_registered_apps(cdap_broker_name, logger):
logger.info("Trying to connect to broker called {0} at {1}".format(cdap_broker_name, broker_url))
registered_apps = json.loads(requests.get("{0}/application".format(broker_url)).text) #should be proper list of strings (appnames)
logger.info("Trying to delete: {0}".format(registered_apps))
- response = requests.post("{0}/application/delete".format(broker_url),
+ return requests.post("{0}/application/delete".format(broker_url),
headers = {'content-type':'application/json'},
json = {"appnames" : registered_apps})
- logger.info("Response: {0}, Response Status: {1}".format(response.text, response.status_code))
- return response