From 770b47da0f4a33b22d757510c9108f30d6cbbd51 Mon Sep 17 00:00:00 2001 From: Tommy Carpenter Date: Thu, 14 Sep 2017 21:54:32 -0400 Subject: DRY up code with decorators Issue-ID: DCAEGEN2-74 Change-Id: Iefdfe3fa60879161ebc6ba3224f342a9af575d94 Signed-off-by: Tommy Carpenter --- cdap/cdapplugin/cdapcloudify/cdap_plugin.py | 167 ++++++++++++++-------------- cdap/cdapplugin/cdapcloudify/discovery.py | 38 ++++--- cdap/cdapplugin/setup.py | 2 +- cdap/cdapplugin/tests/test_cdap_plugin.py | 42 ++++--- cdap/cdapplugin/tests/test_discovery.py | 9 +- 5 files changed, 144 insertions(+), 114 deletions(-) (limited to 'cdap/cdapplugin') diff --git a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py index 00f80e2..d12a23b 100644 --- a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py +++ b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py @@ -53,11 +53,11 @@ def _trigger_update(updated_policies): for p in updated_policies: ctx.logger.info("Reconfiguring CDAP application via smart interface") return discovery.reconfigure_in_broker( - cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], - service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], - config = p, - reconfiguration_type = "program-flowlet-smart", - logger = ctx.logger) + cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + config = p, + reconfiguration_type = "program-flowlet-smart", + logger = ctx.logger) def _validate_conns(connections): """ @@ -110,14 +110,26 @@ def _services_calls_iterator(services_calls): for_config[s["config_key"]] = "{{ " + s["service_component_type"] + " }}" #will get bound by CBS return for_config +###################### +# Decorators +###################### +def try_raise_nonr(func): + def inner(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + raise NonRecoverableError(e) + return inner + ###################### # Cloudify Operations ###################### @operation +@try_raise_nonr def create(connected_broker_dns_name, **kwargs): """ - This is apparantly needed due to the order in which Cloudify relationships are handled in Cloudify. + setup critical runtime properties """ #fail fast @@ -142,53 +154,49 @@ def create(connected_broker_dns_name, **kwargs): ctx.instance.runtime_properties[SER_C] = _services_calls_iterator(ctx.node.properties["connections"][SERVICES_CALLS]) @operation +@try_raise_nonr @Policies.gather_policies_to_node def deploy_and_start_application(**kwargs): """ pushes the application into the workspace and starts it """ - try: - #parse TOSCA model params - config_template = ctx.node.properties["app_config"] - - #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here - #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work - #the rest of the CDAP app's app_config is app-dependent - config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C] - config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C] - config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C] + #parse TOSCA model params + config_template = ctx.node.properties["app_config"] - #register with broker - ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template))) - response = discovery.put_broker( - cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], - service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], - namespace = ctx.node.properties["namespace"], - streamname = ctx.node.properties["streamname"], - jar_url = ctx.node.properties["jar_url"], - artifact_name = ctx.node.properties["artifact_name"], - artifact_version = ctx.node.properties["artifact_version"], - app_config = config_template, - app_preferences = ctx.node.properties["app_preferences"], - service_endpoints = ctx.node.properties["service_endpoints"], - programs = ctx.node.properties["programs"], - program_preferences = ctx.node.properties["program_preferences"], - logger = ctx.logger) + #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here + #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work + #the rest of the CDAP app's app_config is app-dependent + config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C] + config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C] + config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C] - response.raise_for_status() #bomb if not 2xx + #register with broker + ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template))) + response = discovery.put_broker( + cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + namespace = ctx.node.properties["namespace"], + streamname = ctx.node.properties["streamname"], + jar_url = ctx.node.properties["jar_url"], + artifact_name = ctx.node.properties["artifact_name"], + artifact_version = ctx.node.properties["artifact_version"], + app_config = config_template, + app_preferences = ctx.node.properties["app_preferences"], + service_endpoints = ctx.node.properties["service_endpoints"], + programs = ctx.node.properties["programs"], + program_preferences = ctx.node.properties["program_preferences"], + logger = ctx.logger) - #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP - #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure - #get policies that may have changed prior to this blueprint deployment - policy_configs = Policies.get_policy_configs() - if policy_configs is not None: - ctx.logger.info("Updated policy configs: {0}".format(policy_configs)) - response = _trigger_update(policy_configs) - response.raise_for_status() #bomb if not 2xx + response.raise_for_status() #bomb if not 2xx - except Exception as e: - ctx.logger.error("Error depploying CDAP app: {er}".format(er=e)) - raise NonRecoverableError(e) + #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP + #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure + #get policies that may have changed prior to this blueprint deployment + policy_configs = Policies.get_policy_configs() + if policy_configs is not None: + ctx.logger.info("Updated policy configs: {0}".format(policy_configs)) + response = _trigger_update(policy_configs) + response.raise_for_status() #bomb if not 2xx @operation def stop_and_undeploy_application(**kwargs): @@ -196,9 +204,10 @@ def stop_and_undeploy_application(**kwargs): #bombing would also bomb the deletion of the rest of the blueprint ctx.logger.info("Undeploying CDAP application") try: #deregister with the broker, which will also take down the service from consul - response = discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER], - ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], - ctx.logger) + response = discovery.delete_on_broker( + cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + logger = ctx.logger) response.raise_for_status() #bomb if not 2xx except Exception as e: ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e)) @@ -212,7 +221,9 @@ def delete_all_registered_apps(connected_broker_dns_name, **kwargs): """ ctx.logger.info("Undeploying CDAP application") try: - response = discovery.delete_all_registered_apps(connected_broker_dns_name, ctx.logger) + response = discovery.delete_all_registered_apps( + cdap_broker_name = connected_broker_dns_name, + logger = ctx.logger) response.raise_for_status() #bomb if not 2xx except Exception as e: ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e)) @@ -225,59 +236,53 @@ def delete_all_registered_apps(connected_broker_dns_name, **kwargs): # 3) broker deals with the rest ############ @operation +@try_raise_nonr def app_config_reconfigure(new_config_template, **kwargs): """ reconfigure the CDAP app's app config """ - try: - ctx.logger.info("Reconfiguring CDAP application via app_config") - response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], - service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], - config = new_config_template, #This keyname will likely change per policy handler - reconfiguration_type = "program-flowlet-app-config", - logger = ctx.logger) - response.raise_for_status() #bomb if not 2xx - except Exception as e: - raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) + ctx.logger.info("Reconfiguring CDAP application via app_config") + response = discovery.reconfigure_in_broker( + cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + config = new_config_template, #This keyname will likely change per policy handler + reconfiguration_type = "program-flowlet-app-config", + logger = ctx.logger) + response.raise_for_status() #bomb if not 2xx @operation +@try_raise_nonr def app_preferences_reconfigure(new_config_template, **kwargs): """ reconfigure the CDAP app's app preferences """ - try: - ctx.logger.info("Reconfiguring CDAP application via app_preferences") - response = discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], - service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], - config = new_config_template, #This keyname will likely change per policy handler - reconfiguration_type = "program-flowlet-app-preferences", - logger = ctx.logger) - response.raise_for_status() #bomb if not 2xx - except Exception as e: - raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) + ctx.logger.info("Reconfiguring CDAP application via app_preferences") + response = discovery.reconfigure_in_broker( + cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + config = new_config_template, #This keyname will likely change per policy handler + reconfiguration_type = "program-flowlet-app-preferences", + logger = ctx.logger) + response.raise_for_status() #bomb if not 2xx @operation +@try_raise_nonr def app_smart_reconfigure(new_config_template, **kwargs): """ reconfigure the CDAP app via the broker smart interface """ - try: - ctx.logger.info("Reconfiguring CDAP application via smart interface") - response = _trigger_update([new_config_template]) - response.raise_for_status() #bomb if not 2xx - except Exception as e: - raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) + ctx.logger.info("Reconfiguring CDAP application via smart interface") + response = _trigger_update([new_config_template]) + response.raise_for_status() #bomb if not 2xx @operation +@try_raise_nonr @Policies.update_policies_on_node(configs_only=True) def policy_update(updated_policies, **kwargs): #its already develiered through policy ctx.logger.info("Policy update recieved. updated policies: {0}".format(updated_policies)) - try: - #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here. - #However all use cases today are a single policy so OK with this for loop for now. - response = _trigger_update(updated_policies) - response.raise_for_status() #bomb if not 2xx - except Exception as e: - raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) + #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here. + #However all use cases today are a single policy so OK with this for loop for now. + response = _trigger_update(updated_policies) + response.raise_for_status() #bomb if not 2xx diff --git a/cdap/cdapplugin/cdapcloudify/discovery.py b/cdap/cdapplugin/cdapcloudify/discovery.py index 12daebc..e20258f 100644 --- a/cdap/cdapplugin/cdapcloudify/discovery.py +++ b/cdap/cdapplugin/cdapcloudify/discovery.py @@ -43,9 +43,26 @@ def _get_broker_url(cdap_broker_name, service_component_name, logger): logger.info("Trying to connect to broker endpoint: {0}".format(broker_url)) return broker_url +""" +decorators +""" +def run_response(func): + """ + decorator for generic http call, log the response, and return the flask response + + make sure you call the functons below using logger as a kwarg! + """ + def inner(*args, **kwargs): + logger = kwargs["logger"] + response = func(*args, **kwargs) + logger.info((response, response.status_code, response.text)) + return response #let the caller deal with the response + return inner + """ public """ +@run_response def put_broker(cdap_broker_name, service_component_name, namespace, @@ -77,13 +94,11 @@ def put_broker(cdap_broker_name, data["program_preferences"] = program_preferences #register with the broker - response = requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger), + return requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger), json = data, headers = {'content-type':'application/json'}) - logger.info((response, response.status_code, response.text)) - - return response #let the caller deal with the response +@run_response def reconfigure_in_broker(cdap_broker_name, service_component_name, config, @@ -91,20 +106,17 @@ def reconfigure_in_broker(cdap_broker_name, logger): #trigger a reconfiguration with the broker #man am I glad I broke the broker API from 3 to 4 to standardize this interface because now I only need one function here - response = requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)), + return requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)), headers = {'content-type':'application/json'}, json = {"reconfiguration_type" : reconfiguration_type, "config" : config}) - logger.info((response, response.status_code, response.text)) - - return response #let the caller deal with the response +@run_response def delete_on_broker(cdap_broker_name, service_component_name, logger): #deregister with the broker - response = requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger)) - logger.info((response, response.status_code, response.text)) - return response + return requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger)) +@run_response def delete_all_registered_apps(cdap_broker_name, logger): #get the broker connection broker_ip, broker_port = _get_connection_info_from_consul(cdap_broker_name, logger) @@ -114,9 +126,7 @@ def delete_all_registered_apps(cdap_broker_name, logger): logger.info("Trying to connect to broker called {0} at {1}".format(cdap_broker_name, broker_url)) registered_apps = json.loads(requests.get("{0}/application".format(broker_url)).text) #should be proper list of strings (appnames) logger.info("Trying to delete: {0}".format(registered_apps)) - response = requests.post("{0}/application/delete".format(broker_url), + return requests.post("{0}/application/delete".format(broker_url), headers = {'content-type':'application/json'}, json = {"appnames" : registered_apps}) - logger.info("Response: {0}, Response Status: {1}".format(response.text, response.status_code)) - return response diff --git a/cdap/cdapplugin/setup.py b/cdap/cdapplugin/setup.py index b683bd6..729ff1d 100644 --- a/cdap/cdapplugin/setup.py +++ b/cdap/cdapplugin/setup.py @@ -22,7 +22,7 @@ from setuptools import setup, find_packages setup( name = "cdapcloudify", - version = "14.2.3", + version = "14.2.4", packages=find_packages(), author = "Tommy Carpenter", author_email = "tommy at research dot eh tee tee dot com", diff --git a/cdap/cdapplugin/tests/test_cdap_plugin.py b/cdap/cdapplugin/tests/test_cdap_plugin.py index 7434fe8..f28485d 100644 --- a/cdap/cdapplugin/tests/test_cdap_plugin.py +++ b/cdap/cdapplugin/tests/test_cdap_plugin.py @@ -5,9 +5,9 @@ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,59 +17,60 @@ # # ECOMP is a trademark and service mark of AT&T Intellectual Property. -from cdapcloudify.cdap_plugin import _validate_conns, BadConnections +from cdapcloudify.cdap_plugin import _validate_conns, BadConnections, try_raise_nonr import pytest +from cloudify.exceptions import NonRecoverableError #todo.. add more tests.. #shame def _get_good_connection(): connections = {} connections["streams_publishes"] = [ - {"name" : "test_n", + {"name" : "test_n", "location" : "test_l", "client_role" : "test_cr", "type" : "message_router", "config_key" : "test_c", "aaf_username": "test_u", - "aaf_password": "test_p" + "aaf_password": "test_p" }, - {"name" : "test_n2", + {"name" : "test_n2", "location" : "test_l", "client_role" : "test_cr", "type" : "message_router", "config_key" : "test_c", "aaf_username": "test_u", - "aaf_password": "test_p" + "aaf_password": "test_p" }, - {"name" : "test_feed00", + {"name" : "test_feed00", "location" : "test_l", "type" : "data_router", "config_key" : "mydrconfigkey" } ] connections["streams_subscribes"] = [ - {"name" : "test_n", + {"name" : "test_n", "location" : "test_l", "client_role" : "test_cr", "type" : "message_router", "config_key" : "test_c", "aaf_username": "test_u", - "aaf_password": "test_p" + "aaf_password": "test_p" }, - {"name" : "test_n2", + {"name" : "test_n2", "location" : "test_l", "client_role" : "test_cr", "type" : "message_router", "config_key" : "test_c", "aaf_username": "test_u", - "aaf_password": "test_p" + "aaf_password": "test_p" } ] return connections def test_validate_cons(): #test good streams - good_conn = _get_good_connection() + good_conn = _get_good_connection() _validate_conns(good_conn) #mutate @@ -85,3 +86,18 @@ def test_validate_cons(): with pytest.raises(BadConnections) as excinfo: _validate_conns(noloc) +def test_nonr_dec(): + def blow(): + d = {} + d["emptyinside"] += 1 + return d + #apply decorator + blow = try_raise_nonr(blow) + with pytest.raises(NonRecoverableError): + blow() + + def work(): + return 666 + work = try_raise_nonr(work) + assert work() == 666 + diff --git a/cdap/cdapplugin/tests/test_discovery.py b/cdap/cdapplugin/tests/test_discovery.py index 7ee59c4..7354f4e 100644 --- a/cdap/cdapplugin/tests/test_discovery.py +++ b/cdap/cdapplugin/tests/test_discovery.py @@ -63,7 +63,7 @@ def test_put_broker(monkeypatch): "test_se", "test_p", "test_pp", - logger) + logger = logger) assert R.text == "URL: http://666.666.666.666:666/application/test_scn, headers {'content-type': 'application/json'}" assert R.json == {'app_preferences': 'test_ap', 'services': 'test_se', 'namespace': 'test_ns', 'programs': 'test_p', 'cdap_application_type': 'program-flowlet', 'app_config': 'test_ac', 'streamname': 'test_sn', 'program_preferences': 'test_pp', 'artifact_name': 'test_an', 'jar_url': 'test_ju', 'artifact_version': 'test_av'} @@ -77,7 +77,7 @@ def test_reconfigure_in_broker(monkeypatch): _TEST_SCN, {"redome" : "baby"}, "program-flowlet-app-config", - logger) + logger = logger) assert R.text == "URL: http://666.666.666.666:666/application/test_scn/reconfigure, headers {'content-type': 'application/json'}" assert R.json == {'reconfiguration_type': 'program-flowlet-app-config', 'config': {'redome': 'baby'}} assert R.status_code == 200 @@ -88,8 +88,7 @@ def test_delete_on_broker(monkeypatch): R = discovery.delete_on_broker( _TEST_BROKER_NAME, _TEST_SCN, - logger) - print(R.text) + logger = logger) assert R.text == "URL: http://666.666.666.666:666/application/test_scn" assert R.status_code == 200 @@ -108,7 +107,7 @@ def test_multi_delete(monkeypatch): monkeypatch.setattr('requests.post', _fake_putpost) R = discovery.delete_all_registered_apps( _TEST_BROKER_NAME, - logger) + logger = logger) assert R.text == "URL: http://666.666.666.666:666/application/delete, headers {'content-type': 'application/json'}" assert R.status_code == 200 -- cgit 1.2.3-korg