summaryrefslogtreecommitdiffstats
path: root/cdap/cdapplugin
diff options
context:
space:
mode:
Diffstat (limited to 'cdap/cdapplugin')
-rw-r--r--cdap/cdapplugin/cdapcloudify/cdap_plugin.py167
-rw-r--r--cdap/cdapplugin/cdapcloudify/discovery.py38
-rw-r--r--cdap/cdapplugin/setup.py2
-rw-r--r--cdap/cdapplugin/tests/test_cdap_plugin.py42
-rw-r--r--cdap/cdapplugin/tests/test_discovery.py9
5 files changed, 144 insertions, 114 deletions
diff --git a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
index 00f80e2..d12a23b 100644
--- a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
+++ b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
@@ -53,11 +53,11 @@ def _trigger_update(updated_policies):
for p in updated_policies:
ctx.logger.info("Reconfiguring CDAP application via smart interface")
return discovery.reconfigure_in_broker(
- cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- config = p,
- reconfiguration_type = "program-flowlet-smart",
- logger = ctx.logger)
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = p,
+ reconfiguration_type = "program-flowlet-smart",
+ logger = ctx.logger)
def _validate_conns(connections):
"""
@@ -111,13 +111,25 @@ def _services_calls_iterator(services_calls):
return for_config
######################
+# Decorators
+######################
+def try_raise_nonr(func):
+ def inner(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception as e:
+ raise NonRecoverableError(e)
+ return inner
+
+######################
# Cloudify Operations
######################
@operation
+@try_raise_nonr
def create(connected_broker_dns_name, **kwargs):
"""
- This is apparantly needed due to the order in which Cloudify relationships are handled in Cloudify.
+ setup critical runtime properties
"""
#fail fast
@@ -142,53 +154,49 @@ def create(connected_broker_dns_name, **kwargs):
ctx.instance.runtime_properties[SER_C] = _services_calls_iterator(ctx.node.properties["connections"][SERVICES_CALLS])
@operation
+@try_raise_nonr
@Policies.gather_policies_to_node
def deploy_and_start_application(**kwargs):
"""
pushes the application into the workspace and starts it
"""
- try:
- #parse TOSCA model params
- config_template = ctx.node.properties["app_config"]
-
- #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here
- #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work
- #the rest of the CDAP app's app_config is app-dependent
- config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C]
- config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C]
- config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C]
+ #parse TOSCA model params
+ config_template = ctx.node.properties["app_config"]
- #register with broker
- ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template)))
- response = discovery.put_broker(
- cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- namespace = ctx.node.properties["namespace"],
- streamname = ctx.node.properties["streamname"],
- jar_url = ctx.node.properties["jar_url"],
- artifact_name = ctx.node.properties["artifact_name"],
- artifact_version = ctx.node.properties["artifact_version"],
- app_config = config_template,
- app_preferences = ctx.node.properties["app_preferences"],
- service_endpoints = ctx.node.properties["service_endpoints"],
- programs = ctx.node.properties["programs"],
- program_preferences = ctx.node.properties["program_preferences"],
- logger = ctx.logger)
+ #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here
+ #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work
+ #the rest of the CDAP app's app_config is app-dependent
+ config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C]
+ config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C]
+ config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C]
- response.raise_for_status() #bomb if not 2xx
+ #register with broker
+ ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template)))
+ response = discovery.put_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ namespace = ctx.node.properties["namespace"],
+ streamname = ctx.node.properties["streamname"],
+ jar_url = ctx.node.properties["jar_url"],
+ artifact_name = ctx.node.properties["artifact_name"],
+ artifact_version = ctx.node.properties["artifact_version"],
+ app_config = config_template,
+ app_preferences = ctx.node.properties["app_preferences"],
+ service_endpoints = ctx.node.properties["service_endpoints"],
+ programs = ctx.node.properties["programs"],
+ program_preferences = ctx.node.properties["program_preferences"],
+ logger = ctx.logger)
- #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP
- #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure
- #get policies that may have changed prior to this blueprint deployment
- policy_configs = Policies.get_policy_configs()
- if policy_configs is not None:
- ctx.logger.info("Updated policy configs: {0}".format(policy_configs))
- response = _trigger_update(policy_configs)
- response.raise_for_status() #bomb if not 2xx
+ response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- ctx.logger.error("Error depploying CDAP app: {er}".format(er=e))
- raise NonRecoverableError(e)
+ #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP
+ #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure
+ #get policies that may have changed prior to this blueprint deployment
+ policy_configs = Policies.get_policy_configs()
+ if policy_configs is not None:
+ ctx.logger.info("Updated policy configs: {0}".format(policy_configs))
+ response = _trigger_update(policy_configs)
+ response.raise_for_status() #bomb if not 2xx
@operation
def stop_and_undeploy_application(**kwargs):
@@ -196,9 +204,10 @@ def stop_and_undeploy_application(**kwargs):
#bombing would also bomb the deletion of the rest of the blueprint
ctx.logger.info("Undeploying CDAP application")
try: #deregister with the broker, which will also take down the service from consul
- response = discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER],
- ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- ctx.logger)
+ response = discovery.delete_on_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ logger = ctx.logger)
response.raise_for_status() #bomb if not 2xx
except Exception as e:
ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e))
@@ -212,7 +221,9 @@ def delete_all_registered_apps(connected_broker_dns_name, **kwargs):
"""
ctx.logger.info("Undeploying CDAP application")
try:
- response = discovery.delete_all_registered_apps(connected_broker_dns_name, ctx.logger)
+ response = discovery.delete_all_registered_apps(
+ cdap_broker_name = connected_broker_dns_name,
+ logger = ctx.logger)
response.raise_for_status() #bomb if not 2xx
except Exception as e:
ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e))
@@ -225,59 +236,53 @@ def delete_all_registered_apps(connected_broker_dns_name, **kwargs):
# 3) broker deals with the rest
############
@operation
+@try_raise_nonr
def app_config_reconfigure(new_config_template, **kwargs):
"""
reconfigure the CDAP app's app config
"""
- try:
- ctx.logger.info("Reconfiguring CDAP application via app_config")
- response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- config = new_config_template, #This keyname will likely change per policy handler
- reconfiguration_type = "program-flowlet-app-config",
- logger = ctx.logger)
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ ctx.logger.info("Reconfiguring CDAP application via app_config")
+ response = discovery.reconfigure_in_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = new_config_template, #This keyname will likely change per policy handler
+ reconfiguration_type = "program-flowlet-app-config",
+ logger = ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
@operation
+@try_raise_nonr
def app_preferences_reconfigure(new_config_template, **kwargs):
"""
reconfigure the CDAP app's app preferences
"""
- try:
- ctx.logger.info("Reconfiguring CDAP application via app_preferences")
- response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
- config = new_config_template, #This keyname will likely change per policy handler
- reconfiguration_type = "program-flowlet-app-preferences",
- logger = ctx.logger)
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ ctx.logger.info("Reconfiguring CDAP application via app_preferences")
+ response = discovery.reconfigure_in_broker(
+ cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = new_config_template, #This keyname will likely change per policy handler
+ reconfiguration_type = "program-flowlet-app-preferences",
+ logger = ctx.logger)
+ response.raise_for_status() #bomb if not 2xx
@operation
+@try_raise_nonr
def app_smart_reconfigure(new_config_template, **kwargs):
"""
reconfigure the CDAP app via the broker smart interface
"""
- try:
- ctx.logger.info("Reconfiguring CDAP application via smart interface")
- response = _trigger_update([new_config_template])
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ ctx.logger.info("Reconfiguring CDAP application via smart interface")
+ response = _trigger_update([new_config_template])
+ response.raise_for_status() #bomb if not 2xx
@operation
+@try_raise_nonr
@Policies.update_policies_on_node(configs_only=True)
def policy_update(updated_policies, **kwargs):
#its already develiered through policy
ctx.logger.info("Policy update recieved. updated policies: {0}".format(updated_policies))
- try:
- #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here.
- #However all use cases today are a single policy so OK with this for loop for now.
- response = _trigger_update(updated_policies)
- response.raise_for_status() #bomb if not 2xx
- except Exception as e:
- raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+ #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here.
+ #However all use cases today are a single policy so OK with this for loop for now.
+ response = _trigger_update(updated_policies)
+ response.raise_for_status() #bomb if not 2xx
diff --git a/cdap/cdapplugin/cdapcloudify/discovery.py b/cdap/cdapplugin/cdapcloudify/discovery.py
index 12daebc..e20258f 100644
--- a/cdap/cdapplugin/cdapcloudify/discovery.py
+++ b/cdap/cdapplugin/cdapcloudify/discovery.py
@@ -44,8 +44,25 @@ def _get_broker_url(cdap_broker_name, service_component_name, logger):
return broker_url
"""
+decorators
+"""
+def run_response(func):
+ """
+ decorator for generic http call, log the response, and return the flask response
+
+ make sure you call the functons below using logger as a kwarg!
+ """
+ def inner(*args, **kwargs):
+ logger = kwargs["logger"]
+ response = func(*args, **kwargs)
+ logger.info((response, response.status_code, response.text))
+ return response #let the caller deal with the response
+ return inner
+
+"""
public
"""
+@run_response
def put_broker(cdap_broker_name,
service_component_name,
namespace,
@@ -77,13 +94,11 @@ def put_broker(cdap_broker_name,
data["program_preferences"] = program_preferences
#register with the broker
- response = requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger),
+ return requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger),
json = data,
headers = {'content-type':'application/json'})
- logger.info((response, response.status_code, response.text))
-
- return response #let the caller deal with the response
+@run_response
def reconfigure_in_broker(cdap_broker_name,
service_component_name,
config,
@@ -91,20 +106,17 @@ def reconfigure_in_broker(cdap_broker_name,
logger):
#trigger a reconfiguration with the broker
#man am I glad I broke the broker API from 3 to 4 to standardize this interface because now I only need one function here
- response = requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)),
+ return requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)),
headers = {'content-type':'application/json'},
json = {"reconfiguration_type" : reconfiguration_type,
"config" : config})
- logger.info((response, response.status_code, response.text))
-
- return response #let the caller deal with the response
+@run_response
def delete_on_broker(cdap_broker_name, service_component_name, logger):
#deregister with the broker
- response = requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger))
- logger.info((response, response.status_code, response.text))
- return response
+ return requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger))
+@run_response
def delete_all_registered_apps(cdap_broker_name, logger):
#get the broker connection
broker_ip, broker_port = _get_connection_info_from_consul(cdap_broker_name, logger)
@@ -114,9 +126,7 @@ def delete_all_registered_apps(cdap_broker_name, logger):
logger.info("Trying to connect to broker called {0} at {1}".format(cdap_broker_name, broker_url))
registered_apps = json.loads(requests.get("{0}/application".format(broker_url)).text) #should be proper list of strings (appnames)
logger.info("Trying to delete: {0}".format(registered_apps))
- response = requests.post("{0}/application/delete".format(broker_url),
+ return requests.post("{0}/application/delete".format(broker_url),
headers = {'content-type':'application/json'},
json = {"appnames" : registered_apps})
- logger.info("Response: {0}, Response Status: {1}".format(response.text, response.status_code))
- return response
diff --git a/cdap/cdapplugin/setup.py b/cdap/cdapplugin/setup.py
index b683bd6..729ff1d 100644
--- a/cdap/cdapplugin/setup.py
+++ b/cdap/cdapplugin/setup.py
@@ -22,7 +22,7 @@ from setuptools import setup, find_packages
setup(
name = "cdapcloudify",
- version = "14.2.3",
+ version = "14.2.4",
packages=find_packages(),
author = "Tommy Carpenter",
author_email = "tommy at research dot eh tee tee dot com",
diff --git a/cdap/cdapplugin/tests/test_cdap_plugin.py b/cdap/cdapplugin/tests/test_cdap_plugin.py
index 7434fe8..f28485d 100644
--- a/cdap/cdapplugin/tests/test_cdap_plugin.py
+++ b/cdap/cdapplugin/tests/test_cdap_plugin.py
@@ -5,9 +5,9 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,59 +17,60 @@
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-from cdapcloudify.cdap_plugin import _validate_conns, BadConnections
+from cdapcloudify.cdap_plugin import _validate_conns, BadConnections, try_raise_nonr
import pytest
+from cloudify.exceptions import NonRecoverableError
#todo.. add more tests.. #shame
def _get_good_connection():
connections = {}
connections["streams_publishes"] = [
- {"name" : "test_n",
+ {"name" : "test_n",
"location" : "test_l",
"client_role" : "test_cr",
"type" : "message_router",
"config_key" : "test_c",
"aaf_username": "test_u",
- "aaf_password": "test_p"
+ "aaf_password": "test_p"
},
- {"name" : "test_n2",
+ {"name" : "test_n2",
"location" : "test_l",
"client_role" : "test_cr",
"type" : "message_router",
"config_key" : "test_c",
"aaf_username": "test_u",
- "aaf_password": "test_p"
+ "aaf_password": "test_p"
},
- {"name" : "test_feed00",
+ {"name" : "test_feed00",
"location" : "test_l",
"type" : "data_router",
"config_key" : "mydrconfigkey"
}
]
connections["streams_subscribes"] = [
- {"name" : "test_n",
+ {"name" : "test_n",
"location" : "test_l",
"client_role" : "test_cr",
"type" : "message_router",
"config_key" : "test_c",
"aaf_username": "test_u",
- "aaf_password": "test_p"
+ "aaf_password": "test_p"
},
- {"name" : "test_n2",
+ {"name" : "test_n2",
"location" : "test_l",
"client_role" : "test_cr",
"type" : "message_router",
"config_key" : "test_c",
"aaf_username": "test_u",
- "aaf_password": "test_p"
+ "aaf_password": "test_p"
}
]
return connections
def test_validate_cons():
#test good streams
- good_conn = _get_good_connection()
+ good_conn = _get_good_connection()
_validate_conns(good_conn)
#mutate
@@ -85,3 +86,18 @@ def test_validate_cons():
with pytest.raises(BadConnections) as excinfo:
_validate_conns(noloc)
+def test_nonr_dec():
+ def blow():
+ d = {}
+ d["emptyinside"] += 1
+ return d
+ #apply decorator
+ blow = try_raise_nonr(blow)
+ with pytest.raises(NonRecoverableError):
+ blow()
+
+ def work():
+ return 666
+ work = try_raise_nonr(work)
+ assert work() == 666
+
diff --git a/cdap/cdapplugin/tests/test_discovery.py b/cdap/cdapplugin/tests/test_discovery.py
index 7ee59c4..7354f4e 100644
--- a/cdap/cdapplugin/tests/test_discovery.py
+++ b/cdap/cdapplugin/tests/test_discovery.py
@@ -63,7 +63,7 @@ def test_put_broker(monkeypatch):
"test_se",
"test_p",
"test_pp",
- logger)
+ logger = logger)
assert R.text == "URL: http://666.666.666.666:666/application/test_scn, headers {'content-type': 'application/json'}"
assert R.json == {'app_preferences': 'test_ap', 'services': 'test_se', 'namespace': 'test_ns', 'programs': 'test_p', 'cdap_application_type': 'program-flowlet', 'app_config': 'test_ac', 'streamname': 'test_sn', 'program_preferences': 'test_pp', 'artifact_name': 'test_an', 'jar_url': 'test_ju', 'artifact_version': 'test_av'}
@@ -77,7 +77,7 @@ def test_reconfigure_in_broker(monkeypatch):
_TEST_SCN,
{"redome" : "baby"},
"program-flowlet-app-config",
- logger)
+ logger = logger)
assert R.text == "URL: http://666.666.666.666:666/application/test_scn/reconfigure, headers {'content-type': 'application/json'}"
assert R.json == {'reconfiguration_type': 'program-flowlet-app-config', 'config': {'redome': 'baby'}}
assert R.status_code == 200
@@ -88,8 +88,7 @@ def test_delete_on_broker(monkeypatch):
R = discovery.delete_on_broker(
_TEST_BROKER_NAME,
_TEST_SCN,
- logger)
- print(R.text)
+ logger = logger)
assert R.text == "URL: http://666.666.666.666:666/application/test_scn"
assert R.status_code == 200
@@ -108,7 +107,7 @@ def test_multi_delete(monkeypatch):
monkeypatch.setattr('requests.post', _fake_putpost)
R = discovery.delete_all_registered_apps(
_TEST_BROKER_NAME,
- logger)
+ logger = logger)
assert R.text == "URL: http://666.666.666.666:666/application/delete, headers {'content-type': 'application/json'}"
assert R.status_code == 200