summaryrefslogtreecommitdiffstats
path: root/cdap/cdapplugin/cdapcloudify
diff options
context:
space:
mode:
Diffstat (limited to 'cdap/cdapplugin/cdapcloudify')
-rw-r--r--cdap/cdapplugin/cdapcloudify/cdap_plugin.py74
-rw-r--r--cdap/cdapplugin/cdapcloudify/discovery.py45
2 files changed, 69 insertions, 50 deletions
diff --git a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
index 6f4134e..00f80e2 100644
--- a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
+++ b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
@@ -27,6 +27,7 @@ import uuid
import re
from cdapcloudify import discovery
import json
+import requests
# Property keys
SERVICE_COMPONENT_NAME = "service_component_name"
@@ -159,26 +160,31 @@ def deploy_and_start_application(**kwargs):
#register with broker
ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template)))
- discovery.put_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- namespace = ctx.node.properties["namespace"],
- streamname = ctx.node.properties["streamname"],
- jar_url = ctx.node.properties["jar_url"],
- artifact_name = ctx.node.properties["artifact_name"],
- artifact_version = ctx.node.properties["artifact_version"],
- app_config = config_template,
- app_preferences = ctx.node.properties["app_preferences"],
- service_endpoints = ctx.node.properties["service_endpoints"],
- programs = ctx.node.properties["programs"],
- program_preferences = ctx.node.properties["program_preferences"],
- logger = ctx.logger)
+ response = discovery.put_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ namespace = ctx.node.properties["namespace"],
+ streamname = ctx.node.properties["streamname"],
+ jar_url = ctx.node.properties["jar_url"],
+ artifact_name = ctx.node.properties["artifact_name"],
+ artifact_version = ctx.node.properties["artifact_version"],
+ app_config = config_template,
+ app_preferences = ctx.node.properties["app_preferences"],
+ service_endpoints = ctx.node.properties["service_endpoints"],
+ programs = ctx.node.properties["programs"],
+ program_preferences = ctx.node.properties["program_preferences"],
+ logger = ctx.logger)
+
+ response.raise_for_status() #bomb if not 2xx
#TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP
#because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure
#get policies that may have changed prior to this blueprint deployment
policy_configs = Policies.get_policy_configs()
- ctx.logger.info("Updated policy configs: {0}".format(policy_configs))
- _trigger_update(policy_configs)
+ if policy_configs is not None:
+ ctx.logger.info("Updated policy configs: {0}".format(policy_configs))
+ response = _trigger_update(policy_configs)
+ response.raise_for_status() #bomb if not 2xx
except Exception as e:
ctx.logger.error("Error depploying CDAP app: {er}".format(er=e))
@@ -189,11 +195,25 @@ def stop_and_undeploy_application(**kwargs):
#per jack Lucas, do not raise Nonrecoverables on any delete operation. Keep going on them all, cleaning up as much as you can.
#bombing would also bomb the deletion of the rest of the blueprint
ctx.logger.info("Undeploying CDAP application")
-
try: #deregister with the broker, which will also take down the service from consul
- discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER],
+ response = discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER],
ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
+ except Exception as e:
+ ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e))
+
+@operation
+def delete_all_registered_apps(connected_broker_dns_name, **kwargs):
+ """
+ Used in the cdap broker deleter node.
+ Deletes all registered applications (in the broker)
+ per jack Lucas, do not raise Nonrecoverables on any delete operation. Keep going on them all, cleaning up as much as you can.
+ """
+ ctx.logger.info("Undeploying CDAP application")
+ try:
+ response = discovery.delete_all_registered_apps(connected_broker_dns_name, ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
except Exception as e:
ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e))
@@ -211,11 +231,12 @@ def app_config_reconfigure(new_config_template, **kwargs):
"""
try:
ctx.logger.info("Reconfiguring CDAP application via app_config")
- discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
config = new_config_template, #This keyname will likely change per policy handler
reconfiguration_type = "program-flowlet-app-config",
logger = ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
except Exception as e:
raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
@@ -226,11 +247,12 @@ def app_preferences_reconfigure(new_config_template, **kwargs):
"""
try:
ctx.logger.info("Reconfiguring CDAP application via app_preferences")
- discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
config = new_config_template, #This keyname will likely change per policy handler
reconfiguration_type = "program-flowlet-app-preferences",
logger = ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
except Exception as e:
raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
@@ -241,7 +263,8 @@ def app_smart_reconfigure(new_config_template, **kwargs):
"""
try:
ctx.logger.info("Reconfiguring CDAP application via smart interface")
- _trigger_update([new_config_template])
+ response = _trigger_update([new_config_template])
+ response.raise_for_status() #bomb if not 2xx
except Exception as e:
raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
@@ -253,15 +276,8 @@ def policy_update(updated_policies, **kwargs):
try:
#TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here.
#However all use cases today are a single policy so OK with this for loop for now.
- _trigger_update(updated_policies)
+ response = _trigger_update(updated_policies)
+ response.raise_for_status() #bomb if not 2xx
except Exception as e:
raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
-@operation
-def delete_all_registered_apps(connected_broker_dns_name, **kwargs):
- """
- Used in the cdap broker deleter node.
- Deletes all registered applications (in the broker)
- """
- discovery.delete_all_registered_apps(connected_broker_dns_name, ctx.logger)
-
diff --git a/cdap/cdapplugin/cdapcloudify/discovery.py b/cdap/cdapplugin/cdapcloudify/discovery.py
index c654cbb..12daebc 100644
--- a/cdap/cdapplugin/cdapcloudify/discovery.py
+++ b/cdap/cdapplugin/cdapcloudify/discovery.py
@@ -5,9 +5,9 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -46,18 +46,18 @@ def _get_broker_url(cdap_broker_name, service_component_name, logger):
"""
public
"""
-def put_broker(cdap_broker_name,
- service_component_name,
- namespace,
- streamname,
+def put_broker(cdap_broker_name,
+ service_component_name,
+ namespace,
+ streamname,
jar_url,
- artifact_name,
+ artifact_name,
artifact_version,
app_config,
- app_preferences,
- service_endpoints,
- programs,
- program_preferences,
+ app_preferences,
+ service_endpoints,
+ programs,
+ program_preferences,
logger):
"""
Conforms to Broker API 4.X
@@ -75,16 +75,17 @@ def put_broker(cdap_broker_name,
data["services"] = service_endpoints
data["programs"] = programs
data["program_preferences"] = program_preferences
-
+
#register with the broker
response = requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger),
- json = data,
+ json = data,
headers = {'content-type':'application/json'})
logger.info((response, response.status_code, response.text))
- response.raise_for_status() #bomb if not 2xx
-def reconfigure_in_broker(cdap_broker_name,
- service_component_name,
+ return response #let the caller deal with the response
+
+def reconfigure_in_broker(cdap_broker_name,
+ service_component_name,
config,
reconfiguration_type,
logger):
@@ -92,16 +93,17 @@ def reconfigure_in_broker(cdap_broker_name,
#man am I glad I broke the broker API from 3 to 4 to standardize this interface because now I only need one function here
response = requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)),
headers = {'content-type':'application/json'},
- json = {"reconfiguration_type" : reconfiguration_type,
+ json = {"reconfiguration_type" : reconfiguration_type,
"config" : config})
logger.info((response, response.status_code, response.text))
- response.raise_for_status() #bomb if not 2xx
+
+ return response #let the caller deal with the response
def delete_on_broker(cdap_broker_name, service_component_name, logger):
#deregister with the broker
response = requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger))
logger.info((response, response.status_code, response.text))
- response.raise_for_status() #bomb if not 2xx
+ return response
def delete_all_registered_apps(cdap_broker_name, logger):
#get the broker connection
@@ -112,8 +114,9 @@ def delete_all_registered_apps(cdap_broker_name, logger):
logger.info("Trying to connect to broker called {0} at {1}".format(cdap_broker_name, broker_url))
registered_apps = json.loads(requests.get("{0}/application".format(broker_url)).text) #should be proper list of strings (appnames)
logger.info("Trying to delete: {0}".format(registered_apps))
- r = requests.post("{0}/application/delete".format(broker_url),
+ response = requests.post("{0}/application/delete".format(broker_url),
headers = {'content-type':'application/json'},
json = {"appnames" : registered_apps})
- logger.info("Response: {0}, Response Status: {1}".format(r.text, r.status_code))
+ logger.info("Response: {0}, Response Status: {1}".format(response.text, response.status_code))
+ return response