diff options
Diffstat (limited to 'cdap/cdapplugin/cdapcloudify')
-rw-r--r-- | cdap/cdapplugin/cdapcloudify/__init__.py | 30 | ||||
-rw-r--r-- | cdap/cdapplugin/cdapcloudify/cdap_plugin.py | 231 | ||||
-rw-r--r-- | cdap/cdapplugin/cdapcloudify/discovery.py | 105 |
3 files changed, 366 insertions, 0 deletions
diff --git a/cdap/cdapplugin/cdapcloudify/__init__.py b/cdap/cdapplugin/cdapcloudify/__init__.py new file mode 100644 index 0000000..388ac55 --- /dev/null +++ b/cdap/cdapplugin/cdapcloudify/__init__.py @@ -0,0 +1,30 @@ +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import logging + +def get_module_logger(mod_name): + logger = logging.getLogger(mod_name) + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + return logger diff --git a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py new file mode 100644 index 0000000..f5eaf0b --- /dev/null +++ b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py @@ -0,0 +1,231 @@ +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import requests +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +import time +import uuid +import re +from cdapcloudify import discovery +import json + +""" +TODO: Tons of crappy URL forming going on here... +""" +# Property keys +SERVICE_COMPONENT_NAME = "service_component_name" +SELECTED_BROKER = "selected_broker" +PUB_C = "streams_publishes_for_config" +SUB_C = "streams_subscribes_for_config" +SER_C = "services_calls_for_config" +STREAMS_PUBLISHES = "streams_publishes" +STREAMS_SUBSCRIBES = "streams_subscribes" +SERVICES_CALLS = "services_calls" + +# Custom Exception +class BadConnections(NonRecoverableError): + pass + + +def _validate_conns(connections): + """ + Cloudify allows you to type spec a data type in a type file, however it does not appear to do strict checking on blueprints against that. + Sad! + The "connections" block has an important structure to this plugin, so here we validate it and fail fast if it is not correct. + """ + try: + def _assert_ks_in_d(ks,d): + for k in ks: + assert(k in d) + assert STREAMS_PUBLISHES in connections + assert STREAMS_SUBSCRIBES in connections + for s in connections[STREAMS_PUBLISHES] + connections[STREAMS_SUBSCRIBES]: + _assert_ks_in_d(["name", "location", "type", "config_key"], s) + assert(s["type"] in ["message_router", "data_router"]) + if s["type"] == "message_router": + _assert_ks_in_d(["aaf_username", "aaf_password", "client_role"], s) #I am not checking that these are not blank. I will leave it possible for you to put empty values for these, but force you to acknowledge that you are doing so by not allowing these to be ommited. + #nothing extra for DR; no AAF, no client role. + except: + raise BadConnections("Bad Connections definition in blueprint") #is a NoneRecoverable + +def _streams_iterator(streams): + """ + helper function for iterating over streams_publishes and subscribes + note! this is an impure function. it also sets the properties the dmaap plugin needs into runtime properties + """ + for_config = {} + for s in streams: + if s["type"] == "message_router": + #set the properties the DMaaP plugin needs + ctx.instance.runtime_properties[s["name"]] = {"client_role" : s["client_role"], "location" : s["location"]} + #form (or append to) the dict the component will get, including the template for the CBS + for_config[s["config_key"]] = {"aaf_username" : s["aaf_username"], "aaf_password" : s["aaf_password"], "type" : s["type"], "dmaap_info" : "<< " + s["name"] + ">>"} #will get bound by CBS + if s["type"] == "data_router": + #set the properties the DMaaP plugin needs$ + ctx.instance.runtime_properties[s["name"]] = {"location" : s["location"]} + #form (or append to) the dict the component will get, including the template for the CBS$ + for_config[s["config_key"]] = {"type" : s["type"], "dmaap_info" : "<<" + s["name"] + ">>"} #will get bound by CBS + + return for_config + +def _services_calls_iterator(services_calls): + """ + helper function for iterating over services_calls + """ + for_config = {} + for s in services_calls: + #form (or append to) the dict the component will get, including the template for the CBS + for_config[s["config_key"]] = "{{ " + s["service_component_type"] + " }}" #will get bound by CBS + return for_config + +###################### +# Cloudify Operations +###################### + +@operation +def create(connected_broker_dns_name, **kwargs): + """ + This is apparantly needed due to the order in which Cloudify relationships are handled in Cloudify. + """ + + #fail fast + _validate_conns(ctx.node.properties["connections"]) + + #The config binding service needs to know whether cdap or docker. Currently (aug 1 2018) it looks for "cdap_app" in the name + service_component_name = "{0}_cdap_app_{1}".format(str(uuid.uuid4()).replace("-",""), ctx.node.properties["service_component_type"]) + + #set this into a runtime dictionary + ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name + + #fetch the broker name from inputs and set it in runtime properties so other functions can use it + ctx.instance.runtime_properties[SELECTED_BROKER] = connected_broker_dns_name + + #set the properties the DMaap plugin expects for message router + #see the README for the structures of these keys + #NOTE! This has to be done in create because Jack's DMaaP plugin expects to do it's thing in preconfigure. + # and we need to get this key into consul before start + #set this as a runtime property for start to use + ctx.instance.runtime_properties[PUB_C] = _streams_iterator(ctx.node.properties["connections"][STREAMS_PUBLISHES]) + ctx.instance.runtime_properties[SUB_C] = _streams_iterator(ctx.node.properties["connections"][STREAMS_SUBSCRIBES]) + ctx.instance.runtime_properties[SER_C] = _services_calls_iterator(ctx.node.properties["connections"][SERVICES_CALLS]) + +@operation +def deploy_and_start_application(**kwargs): + """ + pushes the application into the workspace and starts it + """ + try: + #parse TOSCA model params + config_template = ctx.node.properties["app_config"] + + #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here + #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work + #the rest of the CDAP app's app_config is app-dependent + config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C] + config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C] + config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C] + + #register with broker + ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template))) + discovery.put_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + namespace = ctx.node.properties["namespace"], + streamname = ctx.node.properties["streamname"], + jar_url = ctx.node.properties["jar_url"], + artifact_name = ctx.node.properties["artifact_name"], + artifact_version = ctx.node.properties["artifact_version"], + app_config = config_template, + app_preferences = ctx.node.properties["app_preferences"], + service_endpoints = ctx.node.properties["service_endpoints"], + programs = ctx.node.properties["programs"], + program_preferences = ctx.node.properties["program_preferences"], + logger = ctx.logger) + + except Exception as e: + ctx.logger.error("Error depploying CDAP app: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def stop_and_undeploy_application(**kwargs): + #per jack Lucas, do not raise Nonrecoverables on any delete operation. Keep going on them all, cleaning up as much as you can. + #bombing would also bomb the deletion of the rest of the blueprint + ctx.logger.info("Undeploying CDAP application") + + try: #deregister with the broker, which will also take down the service from consul + discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER], + ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + ctx.logger) + except Exception as e: + ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e)) + +############ +#RECONFIGURATION +# These calls works as follows: +# 1) it expects "new_config_template" to be a key in kwargs, i.e., passed in using execute_operations -p parameter +# 2) it pushes the new unbound config down to the broker +# 3) broker deals with the rest +############ +@operation +def app_config_reconfigure(new_config_template, **kwargs): + """ + reconfigure the CDAP app's app config + """ + try: + ctx.logger.info("Reconfiguring CDAP application via app_config") + discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + config = new_config_template, #This keyname will likely change per policy handler + reconfiguration_type = "program-flowlet-app-config", + logger = ctx.logger) + except Exception as e: + raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) + +@operation +def app_preferences_reconfigure(new_config_template, **kwargs): + """ + reconfigure the CDAP app's app preferences + """ + try: + ctx.logger.info("Reconfiguring CDAP application via app_preferences") + discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + config = new_config_template, #This keyname will likely change per policy handler + reconfiguration_type = "program-flowlet-app-preferences", + logger = ctx.logger) + except Exception as e: + raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) + +@operation +def app_smart_reconfigure(new_config_template, **kwargs): + """ + reconfigure the CDAP app via the broker smart interface + """ + try: + ctx.logger.info("Reconfiguring CDAP application via smart interface") + discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER], + service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME], + config = new_config_template, #This keyname will likely change per policy handler + reconfiguration_type = "program-flowlet-smart", + logger = ctx.logger) + except Exception as e: + raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e)) + + diff --git a/cdap/cdapplugin/cdapcloudify/discovery.py b/cdap/cdapplugin/cdapcloudify/discovery.py new file mode 100644 index 0000000..a8f0ce2 --- /dev/null +++ b/cdap/cdapplugin/cdapcloudify/discovery.py @@ -0,0 +1,105 @@ +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import requests +import json + +CONSUL_HOST = "http://localhost:8500" + +def _get_broker_url(cdap_broker_name, service_component_name, logger): + """ + fetch the broker connection information from Consul + """ + def _get_connection_info_from_consul(service_component_name, logger): + """ + Call consul's catalog + TODO: currently assumes there is only one service + """ + url = "{0}/v1/catalog/service/{1}".format(CONSUL_HOST, service_component_name) + logger.info("Trying to query: {0}".format(url)) + res = requests.get(url) + res.raise_for_status() + services = res.json() + return services[0]["ServiceAddress"], services[0]["ServicePort"] + + broker_ip, broker_port = _get_connection_info_from_consul(cdap_broker_name, logger) + broker_url = "http://{ip}:{port}/application/{appname}".format(ip=broker_ip, port=broker_port, appname=service_component_name) + logger.info("Trying to connect to broker endpoint: {0}".format(broker_url)) + return broker_url + +""" +public +""" +def put_broker(cdap_broker_name, + service_component_name, + namespace, + streamname, + jar_url, + artifact_name, + artifact_version, + app_config, + app_preferences, + service_endpoints, + programs, + program_preferences, + logger): + """ + Conforms to Broker API 4.X + """ + + data = dict() + data["cdap_application_type"] = "program-flowlet" + data["namespace"] = namespace + data["streamname"] = streamname + data["jar_url"] = jar_url + data["artifact_name"] = artifact_name + data["artifact_version"] = artifact_version + data["app_config"] = app_config + data["app_preferences"] = app_preferences + data["services"] = service_endpoints + data["programs"] = programs + data["program_preferences"] = program_preferences + + #register with the broker + response = requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger), + json = data, + headers = {'content-type':'application/json'}) + logger.info((response, response.status_code, response.text)) + response.raise_for_status() #bomb if not 2xx + +def reconfigure_in_broker(cdap_broker_name, + service_component_name, + config, + reconfiguration_type, + logger): + #trigger a reconfiguration with the broker + #man am I glad I broke the broker API from 3 to 4 to standardize this interface because now I only need one function here + response = requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)), + headers = {'content-type':'application/json'}, + json = {"reconfiguration_type" : reconfiguration_type, + "config" : config}) + logger.info((response, response.status_code, response.text)) + response.raise_for_status() #bomb if not 2xx + +def delete_on_broker(cdap_broker_name, service_component_name, logger): + #deregister with the broker + response = requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger)) + logger.info((response, response.status_code, response.text)) + response.raise_for_status() #bomb if not 2xx + |