summaryrefslogtreecommitdiffstats
path: root/cdap/cdapplugin
diff options
context:
space:
mode:
authorTommy Carpenter <tommy@research.att.com>2017-08-18 14:16:44 -0400
committerTommy Carpenter <tommy@research.att.com>2017-08-18 14:19:20 -0400
commitc272a87b84189f3bf8b3c7287199b816affc9926 (patch)
treeb8d67976ed993633dbb6310df40481c59c641860 /cdap/cdapplugin
parent2e71445141fcbe92e8e12ab24d065040d8de6d9c (diff)
[DCAEGEN2-74] Push up initial CDAP plugin
Change-Id: I2bad410e5c55a59950cabe8fc13066954c4f5c92 Signed-off-by: Tommy Carpenter <tommy@research.att.com>
Diffstat (limited to 'cdap/cdapplugin')
-rw-r--r--cdap/cdapplugin/cdapcloudify/__init__.py30
-rw-r--r--cdap/cdapplugin/cdapcloudify/cdap_plugin.py231
-rw-r--r--cdap/cdapplugin/cdapcloudify/discovery.py105
-rw-r--r--cdap/cdapplugin/requirements.txt1
-rw-r--r--cdap/cdapplugin/setup.py37
-rw-r--r--cdap/cdapplugin/tests/test_plugin.py87
-rw-r--r--cdap/cdapplugin/tox.ini8
7 files changed, 499 insertions, 0 deletions
diff --git a/cdap/cdapplugin/cdapcloudify/__init__.py b/cdap/cdapplugin/cdapcloudify/__init__.py
new file mode 100644
index 0000000..388ac55
--- /dev/null
+++ b/cdap/cdapplugin/cdapcloudify/__init__.py
@@ -0,0 +1,30 @@
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import logging
+
+def get_module_logger(mod_name):
+ logger = logging.getLogger(mod_name)
+ handler = logging.StreamHandler()
+ formatter = logging.Formatter(
+ '%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s')
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+ logger.setLevel(logging.DEBUG)
+ return logger
diff --git a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
new file mode 100644
index 0000000..f5eaf0b
--- /dev/null
+++ b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
@@ -0,0 +1,231 @@
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import requests
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+import time
+import uuid
+import re
+from cdapcloudify import discovery
+import json
+
+"""
+TODO: Tons of crappy URL forming going on here...
+"""
+# Property keys
+SERVICE_COMPONENT_NAME = "service_component_name"
+SELECTED_BROKER = "selected_broker"
+PUB_C = "streams_publishes_for_config"
+SUB_C = "streams_subscribes_for_config"
+SER_C = "services_calls_for_config"
+STREAMS_PUBLISHES = "streams_publishes"
+STREAMS_SUBSCRIBES = "streams_subscribes"
+SERVICES_CALLS = "services_calls"
+
+# Custom Exception
+class BadConnections(NonRecoverableError):
+ pass
+
+
+def _validate_conns(connections):
+ """
+ Cloudify allows you to type spec a data type in a type file, however it does not appear to do strict checking on blueprints against that.
+ Sad!
+ The "connections" block has an important structure to this plugin, so here we validate it and fail fast if it is not correct.
+ """
+ try:
+ def _assert_ks_in_d(ks,d):
+ for k in ks:
+ assert(k in d)
+ assert STREAMS_PUBLISHES in connections
+ assert STREAMS_SUBSCRIBES in connections
+ for s in connections[STREAMS_PUBLISHES] + connections[STREAMS_SUBSCRIBES]:
+ _assert_ks_in_d(["name", "location", "type", "config_key"], s)
+ assert(s["type"] in ["message_router", "data_router"])
+ if s["type"] == "message_router":
+ _assert_ks_in_d(["aaf_username", "aaf_password", "client_role"], s) #I am not checking that these are not blank. I will leave it possible for you to put empty values for these, but force you to acknowledge that you are doing so by not allowing these to be ommited.
+ #nothing extra for DR; no AAF, no client role.
+ except:
+ raise BadConnections("Bad Connections definition in blueprint") #is a NoneRecoverable
+
+def _streams_iterator(streams):
+ """
+ helper function for iterating over streams_publishes and subscribes
+ note! this is an impure function. it also sets the properties the dmaap plugin needs into runtime properties
+ """
+ for_config = {}
+ for s in streams:
+ if s["type"] == "message_router":
+ #set the properties the DMaaP plugin needs
+ ctx.instance.runtime_properties[s["name"]] = {"client_role" : s["client_role"], "location" : s["location"]}
+ #form (or append to) the dict the component will get, including the template for the CBS
+ for_config[s["config_key"]] = {"aaf_username" : s["aaf_username"], "aaf_password" : s["aaf_password"], "type" : s["type"], "dmaap_info" : "<< " + s["name"] + ">>"} #will get bound by CBS
+ if s["type"] == "data_router":
+ #set the properties the DMaaP plugin needs$
+ ctx.instance.runtime_properties[s["name"]] = {"location" : s["location"]}
+ #form (or append to) the dict the component will get, including the template for the CBS$
+ for_config[s["config_key"]] = {"type" : s["type"], "dmaap_info" : "<<" + s["name"] + ">>"} #will get bound by CBS
+
+ return for_config
+
+def _services_calls_iterator(services_calls):
+ """
+ helper function for iterating over services_calls
+ """
+ for_config = {}
+ for s in services_calls:
+ #form (or append to) the dict the component will get, including the template for the CBS
+ for_config[s["config_key"]] = "{{ " + s["service_component_type"] + " }}" #will get bound by CBS
+ return for_config
+
+######################
+# Cloudify Operations
+######################
+
+@operation
+def create(connected_broker_dns_name, **kwargs):
+ """
+ This is apparantly needed due to the order in which Cloudify relationships are handled in Cloudify.
+ """
+
+ #fail fast
+ _validate_conns(ctx.node.properties["connections"])
+
+ #The config binding service needs to know whether cdap or docker. Currently (aug 1 2018) it looks for "cdap_app" in the name
+ service_component_name = "{0}_cdap_app_{1}".format(str(uuid.uuid4()).replace("-",""), ctx.node.properties["service_component_type"])
+
+ #set this into a runtime dictionary
+ ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
+
+ #fetch the broker name from inputs and set it in runtime properties so other functions can use it
+ ctx.instance.runtime_properties[SELECTED_BROKER] = connected_broker_dns_name
+
+ #set the properties the DMaap plugin expects for message router
+ #see the README for the structures of these keys
+ #NOTE! This has to be done in create because Jack's DMaaP plugin expects to do it's thing in preconfigure.
+ # and we need to get this key into consul before start
+ #set this as a runtime property for start to use
+ ctx.instance.runtime_properties[PUB_C] = _streams_iterator(ctx.node.properties["connections"][STREAMS_PUBLISHES])
+ ctx.instance.runtime_properties[SUB_C] = _streams_iterator(ctx.node.properties["connections"][STREAMS_SUBSCRIBES])
+ ctx.instance.runtime_properties[SER_C] = _services_calls_iterator(ctx.node.properties["connections"][SERVICES_CALLS])
+
+@operation
+def deploy_and_start_application(**kwargs):
+ """
+ pushes the application into the workspace and starts it
+ """
+ try:
+ #parse TOSCA model params
+ config_template = ctx.node.properties["app_config"]
+
+ #there is a typed section in the node type called "connections", but the broker expects those two keys at the top level of app_config, so add them here
+ #In cloudify you can't have a custom data type and then specify unknown propertys, the vlidation will fail, so typespeccing just part of app_config doesnt work
+ #the rest of the CDAP app's app_config is app-dependent
+ config_template[SERVICES_CALLS] = ctx.instance.runtime_properties[SER_C]
+ config_template[STREAMS_PUBLISHES] = ctx.instance.runtime_properties[PUB_C]
+ config_template[STREAMS_SUBSCRIBES] = ctx.instance.runtime_properties[SUB_C]
+
+ #register with broker
+ ctx.logger.info("Registering with Broker, config template was: {0}".format(json.dumps(config_template)))
+ discovery.put_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ namespace = ctx.node.properties["namespace"],
+ streamname = ctx.node.properties["streamname"],
+ jar_url = ctx.node.properties["jar_url"],
+ artifact_name = ctx.node.properties["artifact_name"],
+ artifact_version = ctx.node.properties["artifact_version"],
+ app_config = config_template,
+ app_preferences = ctx.node.properties["app_preferences"],
+ service_endpoints = ctx.node.properties["service_endpoints"],
+ programs = ctx.node.properties["programs"],
+ program_preferences = ctx.node.properties["program_preferences"],
+ logger = ctx.logger)
+
+ except Exception as e:
+ ctx.logger.error("Error depploying CDAP app: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def stop_and_undeploy_application(**kwargs):
+ #per jack Lucas, do not raise Nonrecoverables on any delete operation. Keep going on them all, cleaning up as much as you can.
+ #bombing would also bomb the deletion of the rest of the blueprint
+ ctx.logger.info("Undeploying CDAP application")
+
+ try: #deregister with the broker, which will also take down the service from consul
+ discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER],
+ ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ ctx.logger)
+ except Exception as e:
+ ctx.logger.error("Error deregistering from Broker, but continuing with deletion process: {0}".format(e))
+
+############
+#RECONFIGURATION
+# These calls works as follows:
+# 1) it expects "new_config_template" to be a key in kwargs, i.e., passed in using execute_operations -p parameter
+# 2) it pushes the new unbound config down to the broker
+# 3) broker deals with the rest
+############
+@operation
+def app_config_reconfigure(new_config_template, **kwargs):
+ """
+ reconfigure the CDAP app's app config
+ """
+ try:
+ ctx.logger.info("Reconfiguring CDAP application via app_config")
+ discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = new_config_template, #This keyname will likely change per policy handler
+ reconfiguration_type = "program-flowlet-app-config",
+ logger = ctx.logger)
+ except Exception as e:
+ raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+
+@operation
+def app_preferences_reconfigure(new_config_template, **kwargs):
+ """
+ reconfigure the CDAP app's app preferences
+ """
+ try:
+ ctx.logger.info("Reconfiguring CDAP application via app_preferences")
+ discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = new_config_template, #This keyname will likely change per policy handler
+ reconfiguration_type = "program-flowlet-app-preferences",
+ logger = ctx.logger)
+ except Exception as e:
+ raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+
+@operation
+def app_smart_reconfigure(new_config_template, **kwargs):
+ """
+ reconfigure the CDAP app via the broker smart interface
+ """
+ try:
+ ctx.logger.info("Reconfiguring CDAP application via smart interface")
+ discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = new_config_template, #This keyname will likely change per policy handler
+ reconfiguration_type = "program-flowlet-smart",
+ logger = ctx.logger)
+ except Exception as e:
+ raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+
+
diff --git a/cdap/cdapplugin/cdapcloudify/discovery.py b/cdap/cdapplugin/cdapcloudify/discovery.py
new file mode 100644
index 0000000..a8f0ce2
--- /dev/null
+++ b/cdap/cdapplugin/cdapcloudify/discovery.py
@@ -0,0 +1,105 @@
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import requests
+import json
+
+CONSUL_HOST = "http://localhost:8500"
+
+def _get_broker_url(cdap_broker_name, service_component_name, logger):
+ """
+ fetch the broker connection information from Consul
+ """
+ def _get_connection_info_from_consul(service_component_name, logger):
+ """
+ Call consul's catalog
+ TODO: currently assumes there is only one service
+ """
+ url = "{0}/v1/catalog/service/{1}".format(CONSUL_HOST, service_component_name)
+ logger.info("Trying to query: {0}".format(url))
+ res = requests.get(url)
+ res.raise_for_status()
+ services = res.json()
+ return services[0]["ServiceAddress"], services[0]["ServicePort"]
+
+ broker_ip, broker_port = _get_connection_info_from_consul(cdap_broker_name, logger)
+ broker_url = "http://{ip}:{port}/application/{appname}".format(ip=broker_ip, port=broker_port, appname=service_component_name)
+ logger.info("Trying to connect to broker endpoint: {0}".format(broker_url))
+ return broker_url
+
+"""
+public
+"""
+def put_broker(cdap_broker_name,
+ service_component_name,
+ namespace,
+ streamname,
+ jar_url,
+ artifact_name,
+ artifact_version,
+ app_config,
+ app_preferences,
+ service_endpoints,
+ programs,
+ program_preferences,
+ logger):
+ """
+ Conforms to Broker API 4.X
+ """
+
+ data = dict()
+ data["cdap_application_type"] = "program-flowlet"
+ data["namespace"] = namespace
+ data["streamname"] = streamname
+ data["jar_url"] = jar_url
+ data["artifact_name"] = artifact_name
+ data["artifact_version"] = artifact_version
+ data["app_config"] = app_config
+ data["app_preferences"] = app_preferences
+ data["services"] = service_endpoints
+ data["programs"] = programs
+ data["program_preferences"] = program_preferences
+
+ #register with the broker
+ response = requests.put(_get_broker_url(cdap_broker_name, service_component_name, logger),
+ json = data,
+ headers = {'content-type':'application/json'})
+ logger.info((response, response.status_code, response.text))
+ response.raise_for_status() #bomb if not 2xx
+
+def reconfigure_in_broker(cdap_broker_name,
+ service_component_name,
+ config,
+ reconfiguration_type,
+ logger):
+ #trigger a reconfiguration with the broker
+ #man am I glad I broke the broker API from 3 to 4 to standardize this interface because now I only need one function here
+ response = requests.put("{u}/reconfigure".format(u = _get_broker_url(cdap_broker_name, service_component_name, logger)),
+ headers = {'content-type':'application/json'},
+ json = {"reconfiguration_type" : reconfiguration_type,
+ "config" : config})
+ logger.info((response, response.status_code, response.text))
+ response.raise_for_status() #bomb if not 2xx
+
+def delete_on_broker(cdap_broker_name, service_component_name, logger):
+ #deregister with the broker
+ response = requests.delete(_get_broker_url(cdap_broker_name, service_component_name, logger))
+ logger.info((response, response.status_code, response.text))
+ response.raise_for_status() #bomb if not 2xx
+
diff --git a/cdap/cdapplugin/requirements.txt b/cdap/cdapplugin/requirements.txt
new file mode 100644
index 0000000..1128300
--- /dev/null
+++ b/cdap/cdapplugin/requirements.txt
@@ -0,0 +1 @@
+uuid==1.30
diff --git a/cdap/cdapplugin/setup.py b/cdap/cdapplugin/setup.py
new file mode 100644
index 0000000..d16667d
--- /dev/null
+++ b/cdap/cdapplugin/setup.py
@@ -0,0 +1,37 @@
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+import os
+from setuptools import setup, find_packages
+
+setup(
+ name = "cdapcloudify",
+ version = "14.0.2",
+ packages=find_packages(),
+ author = "Tommy Carpenter",
+ author_email = "tommy at research dot eh tee tee dot com",
+ description = ("Cloudify plugin for CDAP"),
+ license = "",
+ keywords = "",
+ url = "https://gerrit.onap.org/r/#/admin/projects/dcaegen2/platform/plugins",
+ zip_safe=False,
+ install_requires = [
+ "uuid==1.30"
+ ]
+)
diff --git a/cdap/cdapplugin/tests/test_plugin.py b/cdap/cdapplugin/tests/test_plugin.py
new file mode 100644
index 0000000..7434fe8
--- /dev/null
+++ b/cdap/cdapplugin/tests/test_plugin.py
@@ -0,0 +1,87 @@
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+from cdapcloudify.cdap_plugin import _validate_conns, BadConnections
+import pytest
+
+#todo.. add more tests.. #shame
+
+def _get_good_connection():
+ connections = {}
+ connections["streams_publishes"] = [
+ {"name" : "test_n",
+ "location" : "test_l",
+ "client_role" : "test_cr",
+ "type" : "message_router",
+ "config_key" : "test_c",
+ "aaf_username": "test_u",
+ "aaf_password": "test_p"
+ },
+ {"name" : "test_n2",
+ "location" : "test_l",
+ "client_role" : "test_cr",
+ "type" : "message_router",
+ "config_key" : "test_c",
+ "aaf_username": "test_u",
+ "aaf_password": "test_p"
+ },
+ {"name" : "test_feed00",
+ "location" : "test_l",
+ "type" : "data_router",
+ "config_key" : "mydrconfigkey"
+ }
+ ]
+ connections["streams_subscribes"] = [
+ {"name" : "test_n",
+ "location" : "test_l",
+ "client_role" : "test_cr",
+ "type" : "message_router",
+ "config_key" : "test_c",
+ "aaf_username": "test_u",
+ "aaf_password": "test_p"
+ },
+ {"name" : "test_n2",
+ "location" : "test_l",
+ "client_role" : "test_cr",
+ "type" : "message_router",
+ "config_key" : "test_c",
+ "aaf_username": "test_u",
+ "aaf_password": "test_p"
+ }
+ ]
+ return connections
+
+def test_validate_cons():
+ #test good streams
+ good_conn = _get_good_connection()
+ _validate_conns(good_conn)
+
+ #mutate
+ nosub = _get_good_connection().pop("streams_subscribes")
+ with pytest.raises(BadConnections) as excinfo:
+ _validate_conns(nosub)
+
+ nopub = _get_good_connection().pop("streams_publishes")
+ with pytest.raises(BadConnections) as excinfo:
+ _validate_conns(nopub)
+
+ noloc = _get_good_connection()["streams_publishes"][0].pop("location")
+ with pytest.raises(BadConnections) as excinfo:
+ _validate_conns(noloc)
+
diff --git a/cdap/cdapplugin/tox.ini b/cdap/cdapplugin/tox.ini
new file mode 100644
index 0000000..afabca4
--- /dev/null
+++ b/cdap/cdapplugin/tox.ini
@@ -0,0 +1,8 @@
+[tox]
+envlist = py27
+[testenv]
+deps=
+ pytest
+ uuid==1.30
+ cloudify==3.4
+commands=pytest