summaryrefslogtreecommitdiffstats
path: root/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
diff options
context:
space:
mode:
Diffstat (limited to 'cdap/cdapplugin/cdapcloudify/cdap_plugin.py')
-rw-r--r--cdap/cdapplugin/cdapcloudify/cdap_plugin.py167
1 files changed, 86 insertions, 81 deletions
diff --git a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
index 00f80e2..d12a23b 100644
--- a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
+++ b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
@@ -53,11 +53,11 @@ def _trigger_update(updated_policies):
for p in updated_policies:
ctx.logger.info("Reconfiguring CDAP application via smart interface")
return discovery.reconfigure_in_broker(
- cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- config = p,
- reconfiguration_type = "program-flowlet-smart",
- logger = ctx.logger)
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = p,
+ reconfiguration_type = "program-flowlet-smart",
+ logger = ctx.logger)
def _validate_conns(connections):
"""
@@ -111,13 +111,25 @@ def _services_calls_iterator(services_calls):
return for_config
######################
+# Decorators
+######################
+def try_raise_nonr(func):
+ def inner(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception as e:
+ raise NonRecoverableError(e)
+ return inner
+
+######################
# Cloudify Operations
######################
@operation
+@try_raise_nonr
def create(connected_broker_dns_name, **kwargs):
"""
- This is apparantly needed due to the order in which Cloudify relationships are handled in Cloudify.
+ setup critical runtime properties
"""
#fail fast
@@ -142,53 +154,49 @@ def create(connected_broker_dns_name, **kwargs):
ctx.instance.runtime_properties[SER_C] = _services_calls_iterator(ctx.node.properties["connections"][SERVICES_CALLS])
@operation
+@try_raise_nonr
@Policies.gather_policies_to_node
def deploy_and_start_application(**kwargs):
"""
pushes the application into the workspace and starts it
"""
- try:
- #parse TOSCA model params
- config_template = ctx.node.properties["app_config"]
-
- #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here
- #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work
- #the rest of the CDAP app's app_config is app-dependent
- config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C]
- config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C]
- config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C]
+ #parse TOSCA model params
+ config_template = ctx.node.properties["app_config"]
- #register with broker
- ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template)))
- response = discovery.put_broker(
- cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- namespace = ctx.node.properties["namespace"],
- streamname = ctx.node.properties["streamname"],
- jar_url = ctx.node.properties["jar_url"],
- artifact_name = ctx.node.properties["artifact_name"],
- artifact_version = ctx.node.properties["artifact_version"],
- app_config = config_template,
- app_preferences = ctx.node.properties["app_preferences"],
- service_endpoints = ctx.node.properties["service_endpoints"],
- programs = ctx.node.properties["programs"],
- program_preferences = ctx.node.properties["program_preferences"],
- logger = ctx.logger)
+ #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here
+ #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work
+ #the rest of the CDAP app's app_config is app-dependent
+ config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C]
+ config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C]
+ config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C]
- response.raise_for_status() #bomb if not 2xx
+ #register with broker
+ ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template)))
+ response = discovery.put_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ namespace = ctx.node.properties["namespace"],
+ streamname = ctx.node.properties["streamname"],
+ jar_url = ctx.node.properties["jar_url"],
+ artifact_name = ctx.node.properties["artifact_name"],
+ artifact_version = ctx.node.properties["artifact_version"],
+ app_config = config_template,
+ app_preferences = ctx.node.properties["app_preferences"],
+ service_endpoints = ctx.node.properties["service_endpoints"],
+ programs = ctx.node.properties["programs"],
+ program_preferences = ctx.node.properties["program_preferences"],
+ logger = ctx.logger)
- #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP
- #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure
- #get policies that may have changed prior to this blueprint deployment
- policy_configs = Policies.get_policy_configs()
- if policy_configs is not None:
- ctx.logger.info("Updated policy configs: {0}".format(policy_configs))
- response = _trigger_update(policy_configs)
- response.raise_for_status() #bomb if not 2xx
+ response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- ctx.logger.error("Error depploying CDAP app: {er}".format(er=e))
- raise NonRecoverableError(e)
+ #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP
+ #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure
+ #get policies that may have changed prior to this blueprint deployment
+ policy_configs = Policies.get_policy_configs()
+ if policy_configs is not None:
+ ctx.logger.info("Updated policy configs: {0}".format(policy_configs))
+ response = _trigger_update(policy_configs)
+ response.raise_for_status() #bomb if not 2xx
@operation
def stop_and_undeploy_application(**kwargs):
@@ -196,9 +204,10 @@ def stop_and_undeploy_application(**kwargs):
#bombing would also bomb the deletion of the rest of the blueprint
ctx.logger.info("Undeploying CDAP application")
try: #deregister with the broker, which will also take down the service from consul
- response = discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER],
- ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- ctx.logger)
+ response = discovery.delete_on_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ logger = ctx.logger)
response.raise_for_status() #bomb if not 2xx
except Exception as e:
ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e))
@@ -212,7 +221,9 @@ def delete_all_registered_apps(connected_broker_dns_name, **kwargs):
"""
ctx.logger.info("Undeploying CDAP application")
try:
- response = discovery.delete_all_registered_apps(connected_broker_dns_name, ctx.logger)
+ response = discovery.delete_all_registered_apps(
+ cdap_broker_name = connected_broker_dns_name,
+ logger = ctx.logger)
response.raise_for_status() #bomb if not 2xx
except Exception as e:
ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e))
@@ -225,59 +236,53 @@ def delete_all_registered_apps(connected_broker_dns_name, **kwargs):
# 3) broker deals with the rest
############
@operation
+@try_raise_nonr
def app_config_reconfigure(new_config_template, **kwargs):
"""
reconfigure the CDAP app's app config
"""
- try:
- ctx.logger.info("Reconfiguring CDAP application via app_config")
- response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- config = new_config_template, #This keyname will likely change per policy handler
- reconfiguration_type = "program-flowlet-app-config",
- logger = ctx.logger)
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ ctx.logger.info("Reconfiguring CDAP application via app_config")
+ response = discovery.reconfigure_in_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = new_config_template, #This keyname will likely change per policy handler
+ reconfiguration_type = "program-flowlet-app-config",
+ logger = ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
@operation
+@try_raise_nonr
def app_preferences_reconfigure(new_config_template, **kwargs):
"""
reconfigure the CDAP app's app preferences
"""
- try:
- ctx.logger.info("Reconfiguring CDAP application via app_preferences")
- response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- config = new_config_template, #This keyname will likely change per policy handler
- reconfiguration_type = "program-flowlet-app-preferences",
- logger = ctx.logger)
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ ctx.logger.info("Reconfiguring CDAP application via app_preferences")
+ response = discovery.reconfigure_in_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = new_config_template, #This keyname will likely change per policy handler
+ reconfiguration_type = "program-flowlet-app-preferences",
+ logger = ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
@operation
+@try_raise_nonr
def app_smart_reconfigure(new_config_template, **kwargs):
"""
reconfigure the CDAP app via the broker smart interface
"""
- try:
- ctx.logger.info("Reconfiguring CDAP application via smart interface")
- response = _trigger_update([new_config_template])
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ ctx.logger.info("Reconfiguring CDAP application via smart interface")
+ response = _trigger_update([new_config_template])
+ response.raise_for_status() #bomb if not 2xx
@operation
+@try_raise_nonr
@Policies.update_policies_on_node(configs_only=True)
def policy_update(updated_policies, **kwargs):
#its already develiered through policy
ctx.logger.info("Policy update recieved. updated policies: {0}".format(updated_policies))
- try:
- #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here.
- #However all use cases today are a single policy so OK with this for loop for now.
- response = _trigger_update(updated_policies)
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here.
+ #However all use cases today are a single policy so OK with this for loop for now.
+ response = _trigger_update(updated_policies)
+ response.raise_for_status() #bomb if not 2xx