diff options
Diffstat (limited to 'cdap/cdapplugin/cdapcloudify')
-rw-r--r-- | cdap/cdapplugin/cdapcloudify/cdap_plugin.py | 74 | ||||
-rw-r--r-- | cdap/cdapplugin/cdapcloudify/discovery.py | 45 |
2 files changed, 69 insertions, 50 deletions
diff --git a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py index 6f4134e..00f80e2 100644 --- a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py +++ b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py @@ -27,6 +27,7 @@ import uuid import re from cdapcloudify import discovery import json +import requests # Property keys SERVICE_COMPONENT_NAME = "service_component_name" @@ -159,26 +160,31 @@ def deploy_and_start_application(**kwargs): #register with broker ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template))) - discovery.put_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], - service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], - namespace = ctx.node.properties["namespace"], - streamname = ctx.node.properties["streamname"], - jar_url = ctx.node.properties["jar_url"], - artifact_name = ctx.node.properties["artifact_name"], - artifact_version = ctx.node.properties["artifact_version"], - app_config = config_template, - app_preferences = ctx.node.properties["app_preferences"], - service_endpoints = ctx.node.properties["service_endpoints"], - programs = ctx.node.properties["programs"], - program_preferences = ctx.node.properties["program_preferences"], - logger = ctx.logger) + response = discovery.put_broker( + cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + namespace = ctx.node.properties["namespace"], + streamname = ctx.node.properties["streamname"], + jar_url = ctx.node.properties["jar_url"], + artifact_name = ctx.node.properties["artifact_name"], + artifact_version = ctx.node.properties["artifact_version"], + app_config = config_template, + app_preferences = ctx.node.properties["app_preferences"], + service_endpoints = ctx.node.properties["service_endpoints"], + programs = ctx.node.properties["programs"], + program_preferences = ctx.node.properties["program_preferences"], + logger = ctx.logger) + + response.raise_for_status() #bomb if not 2xx #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure #get policies that may have changed prior to this blueprint deployment policy_configs = Policies.get_policy_configs() - ctx.logger.info("Updated policy configs: {0}".format(policy_configs)) - _trigger_update(policy_configs) + if policy_configs is not None: + ctx.logger.info("Updated policy configs: {0}".format(policy_configs)) + response = _trigger_update(policy_configs) + response.raise_for_status() #bomb if not 2xx except Exception as e: ctx.logger.error("Error depploying CDAP app: {er}".format(er=e)) @@ -189,11 +195,25 @@ def stop_and_undeploy_application(**kwargs): #per jack Lucas, do not raise Nonrecoverables on any delete operation. Keep going on them all, cleaning up as much as you can. #bombing would also bomb the deletion of the rest of the blueprint ctx.logger.info("Undeploying CDAP application") - try: #deregister with the broker, which will also take down the service from consul - discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER], + response = discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER], ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], ctx.logger) + response.raise_for_status() #bomb if not 2xx + except Exception as e: + ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e)) + +@operation +def delete_all_registered_apps(connected_broker_dns_name, **kwargs): + """ + Used in the cdap broker deleter node. + Deletes all registered applications (in the broker) + per jack Lucas, do not raise Nonrecoverables on any delete operation. Keep going on them all, cleaning up as much as you can. + """ + ctx.logger.info("Undeploying CDAP application") + try: + response = discovery.delete_all_registered_apps(connected_broker_dns_name, ctx.logger) + response.raise_for_status() #bomb if not 2xx except Exception as e: ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e)) @@ -211,11 +231,12 @@ def app_config_reconfigure(new_config_template, **kwargs): """ try: ctx.logger.info("Reconfiguring CDAP application via app_config") - discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], config = new_config_template, #This keyname will likely change per policy handler reconfiguration_type = "program-flowlet-app-config", logger = ctx.logger) + response.raise_for_status() #bomb if not 2xx except Exception as e: raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) @@ -226,11 +247,12 @@ def app_preferences_reconfigure(new_config_template, **kwargs): """ try: ctx.logger.info("Reconfiguring CDAP application via app_preferences") - discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], config = new_config_template, #This keyname will likely change per policy handler reconfiguration_type = "program-flowlet-app-preferences", logger = ctx.logger) + response.raise_for_status() #bomb if not 2xx except Exception as e: raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) @@ -241,7 +263,8 @@ def app_smart_reconfigure(new_config_template, **kwargs): """ try: ctx.logger.info("Reconfiguring CDAP application via smart interface") - _trigger_update([new_config_template]) + response = _trigger_update([new_config_template]) + response.raise_for_status() #bomb if not 2xx except Exception as e: raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) @@ -253,15 +276,8 @@ def policy_update(updated_policies, **kwargs): try: #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here. #However all use cases today are a single policy so OK with this for loop for now. - _trigger_update(updated_policies) + response = _trigger_update(updated_policies) + response.raise_for_status() #bomb if not 2xx except Exception as e: raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) -@operation -def delete_all_registered_apps(connected_broker_dns_name, **kwargs): - """ - Used in the cdap broker deleter node. - Deletes all registered applications (in the broker) - """ - discovery.delete_all_registered_apps(connected_broker_dns_name, ctx.logger) - diff --git a/cdap/cdapplugin/cdapcloudify/discovery.py b/cdap/cdapplugin/cdapcloudify/discovery.py index c654cbb..12daebc 100644 --- a/cdap/cdapplugin/cdapcloudify/discovery.py +++ b/cdap/cdapplugin/cdapcloudify/discovery.py @@ -5,9 +5,9 @@ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -46,18 +46,18 @@ def _get_broker_url(cdap_broker_name, service_component_name, logger): """ public """ -def put_broker(cdap_broker_name, - service_component_name, - namespace, - streamname, +def put_broker(cdap_broker_name, + service_component_name, + namespace, + streamname, jar_url, - artifact_name, + artifact_name, artifact_version, app_config, - app_preferences, - service_endpoints, - programs, - program_preferences, + app_preferences, + service_endpoints, + programs, + program_preferences, logger): """ Conforms to Broker API 4.X @@ -75,16 +75,17 @@ def put_broker(cdap_broker_name, data["services"] = service_endpoints data["programs"] = programs data["program_preferences"] = program_preferences - + #register with the broker response = requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger), - json = data, + json = data, headers = {'content-type':'application/json'}) logger.info((response, response.status_code, response.text)) - response.raise_for_status() #bomb if not 2xx -def reconfigure_in_broker(cdap_broker_name, - service_component_name, + return response #let the caller deal with the response + +def reconfigure_in_broker(cdap_broker_name, + service_component_name, config, reconfiguration_type, logger): @@ -92,16 +93,17 @@ def reconfigure_in_broker(cdap_broker_name, #man am I glad I broke the broker API from 3 to 4 to standardize this interface because now I only need one function here response = requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)), headers = {'content-type':'application/json'}, - json = {"reconfiguration_type" : reconfiguration_type, + json = {"reconfiguration_type" : reconfiguration_type, "config" : config}) logger.info((response, response.status_code, response.text)) - response.raise_for_status() #bomb if not 2xx + + return response #let the caller deal with the response def delete_on_broker(cdap_broker_name, service_component_name, logger): #deregister with the broker response = requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger)) logger.info((response, response.status_code, response.text)) - response.raise_for_status() #bomb if not 2xx + return response def delete_all_registered_apps(cdap_broker_name, logger): #get the broker connection @@ -112,8 +114,9 @@ def delete_all_registered_apps(cdap_broker_name, logger): logger.info("Trying to connect to broker called {0} at {1}".format(cdap_broker_name, broker_url)) registered_apps = json.loads(requests.get("{0}/application".format(broker_url)).text) #should be proper list of strings (appnames) logger.info("Trying to delete: {0}".format(registered_apps)) - r = requests.post("{0}/application/delete".format(broker_url), + response = requests.post("{0}/application/delete".format(broker_url), headers = {'content-type':'application/json'}, json = {"appnames" : registered_apps}) - logger.info("Response: {0}, Response Status: {1}".format(r.text, r.status_code)) + logger.info("Response: {0}, Response Status: {1}".format(response.text, response.status_code)) + return response |