diff options
author | Jack Lucas <jflucas@research.att.com> | 2017-09-01 13:48:08 +0000 |
---|---|---|
committer | Jack Lucas <jflucas@research.att.com> | 2017-09-01 13:50:18 +0000 |
commit | 0619cb3772a12e0a5a3aecc09b96eeb7df20a000 (patch) | |
tree | b8a921e0e41578163f7c2db41450301c374361df /dmaap/dmaapplugin | |
parent | ac3f029edc6bc8b25ab44666b5408c8929e6ba92 (diff) |
Add seed code for DMaaP plugin
Change-Id: I8c7a9c432badd3052a571ed87b9b580760b376e6
Issue-Id: CCSDK-65
Signed-off-by: Jack Lucas <jflucas@research.att.com>
Diffstat (limited to 'dmaap/dmaapplugin')
-rw-r--r-- | dmaap/dmaapplugin/__init__.py | 46 | ||||
-rw-r--r-- | dmaap/dmaapplugin/dmaaputils.py | 28 | ||||
-rw-r--r-- | dmaap/dmaapplugin/dr_bridge.py | 198 | ||||
-rw-r--r-- | dmaap/dmaapplugin/dr_lifecycle.py | 121 | ||||
-rw-r--r-- | dmaap/dmaapplugin/dr_relationships.py | 211 | ||||
-rw-r--r-- | dmaap/dmaapplugin/mr_lifecycle.py | 121 | ||||
-rw-r--r-- | dmaap/dmaapplugin/mr_relationships.py | 119 |
7 files changed, 844 insertions, 0 deletions
diff --git a/dmaap/dmaapplugin/__init__.py b/dmaap/dmaapplugin/__init__.py new file mode 100644 index 0000000..130c0bf --- /dev/null +++ b/dmaap/dmaapplugin/__init__.py @@ -0,0 +1,46 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +## Get parameters for accessing the DMaaP controller +from consulif.consulif import ConsulHandle +from cloudify.exceptions import NonRecoverableError + +CONSUL_HOST = "127.0.0.1" # Should always be a local consul agent on Cloudify Manager +CM_SERVICE_NAME = "cloudify_manager" # Name under which CM is registered, used as key to get config +DBC_SERVICE_NAME= "dmaap_bus_controller" # Name under which the DMaaP bus controller is registered + +try: + _ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, None) + config = _ch.get_config(CM_SERVICE_NAME) + DMAAP_USER = config['dmaap']['username'] + DMAAP_PASS = config['dmaap']['password'] + DMAAP_OWNER = config['dmaap']['owner'] + if 'protocol' in config['dmaap']: + DMAAP_PROTOCOL = config['dmaap']['protocol'] + else: + DMAAP_PROTOCOL = 'https' # Default to https (service discovery should give us this but doesn't + if 'path' in config['dmaap']: + DMAAP_PATH = config['dmaap']['path'] + else: + DMAAP_PATH = 'webapi' # SHould come from service discovery but Consul doesn't support it + + service_address, service_port = _ch.get_service(DBC_SERVICE_NAME) + DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH) + +except Exception as e: + raise NonRecoverableError("Error configuring dmaap plugin: {0}".format(e)) diff --git a/dmaap/dmaapplugin/dmaaputils.py b/dmaap/dmaapplugin/dmaaputils.py new file mode 100644 index 0000000..e043a07 --- /dev/null +++ b/dmaap/dmaapplugin/dmaaputils.py @@ -0,0 +1,28 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +# Utility functions + +import string +import random + +def random_string(n): + ''' + Create a random alphanumeric string, n characters long. + ''' + return ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for x in range(n)) diff --git a/dmaap/dmaapplugin/dr_bridge.py b/dmaap/dmaapplugin/dr_bridge.py new file mode 100644 index 0000000..4e0df4d --- /dev/null +++ b/dmaap/dmaapplugin/dr_bridge.py @@ -0,0 +1,198 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS +from dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +# Set up a subscriber to a source feed +def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw): + # Add subscriber to source feed + add_sub = dmc.add_subscriber(source_feed_id, loc, delivery_url, username, userpw) + add_sub.raise_for_status() + return add_sub.json() + +# Set up a publisher to a target feed +def _set_up_publisher(dmc, target_feed_id, loc): + username = random_string(8) + userpw = random_string(10) + add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw) + add_pub.raise_for_status() + pub_info = add_pub.json() + return pub_info["pubId"], username, userpw + +# Get a central location to use when creating a publisher or subscriber +def _get_central_location(dmc): + locations = dmc.get_dcae_locations('opendcae-central') + if len(locations) < 1: + raise Exception('No central location found for setting up DR bridging') + return locations[0] # We take the first one. Typically there will be two central locations + + +# Set up a "bridge" between two feeds internal to DCAE +# A source feed "bridges_to" a target feed, meaning that anything published to +# the source feed will be delivered to subscribers to the target feed (as well as +# to subscribers of the source feed). +# +# The bridge is established by first adding a publisher to the target feed. The result of doing this +# is a publish URL and a set of publication credentials. +#The publish URL and publication credentials are used to set up a subscriber to the source feed. +#I.e., we tell the source feed to deliver to an endpoint which is actually a publish +# endpoint for the target feed. +@operation +def create_dr_bridge(**kwargs): + + try: + + # Get source and target feed ids + if 'feed_id' in ctx.target.instance.runtime_properties: + target_feed_id = ctx.target.instance.runtime_properties['feed_id'] + else: + raise Exception('Target feed has no feed_id property') + if 'feed_id' in ctx.source.instance.runtime_properties: + source_feed_id = ctx.source.instance.runtime_properties['feed_id'] + else: + raise Exception('Source feed has no feed_id property') + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + # Get a location to use when creating a publisher or subscriber--a central location seems reasonable + loc = _get_central_location(dmc) + + ctx.logger.info('Creating bridge from feed {0} to feed {1} using location {2}'.format(source_feed_id, target_feed_id, loc)) + + # Add publisher to target feed + publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc) + ctx.logger.info("Added publisher id {0} to target feed {1} with user {2}".format(publisher_id, target_feed_id, username)) + + # Add subscriber to source feed + delivery_url = ctx.target.instance.runtime_properties['publish_url'] + subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw) + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, delivery_url)) + + # Save the publisher and subscriber IDs on the source node, indexed by the target node id + ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "subscriber_id": subscriber_id} + + except Exception as e: + ctx.logger.error("Error creating bridge: {0}".format(e)) + raise NonRecoverableError(e) + +# Set up a bridge from an internal DCAE feed to a feed in an external Data Router system +# The target feed needs to be provisioned in the external Data Router system. A publisher +# to that feed must also be set up in the external Data Router system. The publish URL, +# username, and password need to be captured in a target node of type dcae.nodes.ExternalTargetFeed. +# The bridge is established by setting up a subscriber to the internal DCAE source feed using the +# external feed publisher parameters as delivery parameters for the subscriber. +@operation +def create_external_dr_bridge(**kwargs): + try: + + # Make sure target feed has full set of properties + if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties: + url = ctx.target.node.properties['url'] + username = ctx.target.node.properties['username'] + userpw = ctx.target.node.properties['userpw'] + else: + raise Exception ("Target feed missing url, username, and/or user pw") + + # Make sure source feed has a feed ID + if 'feed_id' in ctx.source.instance.runtime_properties: + source_feed_id = ctx.source.instance.runtime_properties['feed_id'] + else: + raise Exception('Source feed has no feed_id property') + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + # Get a central location to use when creating subscriber + loc = _get_central_location(dmc) + + ctx.logger.info('Creating external bridge from feed {0} to external url {1} using location {2}'.format(source_feed_id, url, loc)) + + # Create subscription to source feed using properties of the external target feed + subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, url, username, userpw) + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, url)) + + # Save the subscriber ID on the source node, indexed by the target node id + ctx.source.instance.runtime_properties[ctx.target.node.id] = {"subscriber_id": subscriber_id} + + except Exception as e: + ctx.logger.error("Error creating external bridge: {0}".format(e)) + raise NonRecoverableError(e) + +# Set up a bridge from a feed in an external Data Router system to an internal DCAE feed. +# The bridge is established by creating a publisher on the internal DCAE feed. Then a subscription +# to the external feed is created through manual provisioning in the external Data Router system, using +# the publish URL and the publisher username and password for the internal feed as the delivery parameters +# for the external subscription. +# In order to obtain the publish URL, publisher username, and password, a blueprint using this sort of +# bridge will typically have an output that exposes the runtime_property set on the source node in this operation. +@operation +def create_external_source_dr_bridge(**kwargs): + try: + # Get target feed id + if 'feed_id' in ctx.target.instance.runtime_properties: + target_feed_id = ctx.target.instance.runtime_properties['feed_id'] + else: + raise Exception('Target feed has no feed_id property') + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + # Get a central location to use when creating a publisher + loc = _get_central_location(dmc) + + # Create a publisher on the target feed + publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc) + + # Save the publisher info on the source node, indexed by the target node + ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "url": ctx.target.instance.runtime_properties["publish_url"], "username": username, "userpw": userpw} + + except Exception as e: + ctx.logger.error("Error creating external source bridge: {0}".format(e)) + +# Remove the bridge between the relationship source and target. +# For a bridge between 2 internal feeds, deletes the subscriber on the source feed and the publisher on the target feed. +# For a bridge to an external target feed, deletes the subscriber on the source feed. +# For a bridge from an external source feed, deletes the publisher on the target feed. +@operation +def remove_dr_bridge(**kwargs): + try: + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + if ctx.target.node.id in ctx.source.instance.runtime_properties: + + if 'subscriber_id' in ctx.source.instance.runtime_properties[ctx.target.node.id]: + # Delete the subscription for this bridge + ctx.logger.info("Removing bridge -- deleting subscriber {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id'])) + dmc.delete_subscriber(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id']) + + if 'publisher_id' in ctx.source.instance.runtime_properties: + # Delete the publisher for this bridge + ctx.logger.info("Removing bridge -- deleting publisher {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id'])) + dmc.delete_publisher(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id']) + + ctx.logger.info("Remove bridge from {0} to {1}".format(ctx.source.node.id, ctx.target.node.id)) + + except Exception as e: + ctx.logger.error("Error removing bridge: {0}".format(e)) + # Let the uninstall workflow proceed--don't throw a NonRecoverableError diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py new file mode 100644 index 0000000..45f8674 --- /dev/null +++ b/dmaap/dmaapplugin/dr_lifecycle.py @@ -0,0 +1,121 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER +from dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +# Lifecycle operations for DMaaP Data Router feeds + +@operation +def create_feed(**kwargs): + ''' + Create a new data router feed + Expects "feed_name" to be set in node properties + Allows "feed_version", "feed_description", and "aspr_classification" as optional properties + (Sets default values if not provided ) + Sets instance runtime properties: + - "feed_id" + - "publish_url" + - "log_url" + + ''' + try: + # Make sure there's a feed_name + if "feed_name" in ctx.node.properties.keys(): + feed_name = ctx.node.properties["feed_name"] + else: + feed_name = random_string(12) + + # Set defaults/placeholders for the optional properties for the feed + if "feed_version" in ctx.node.properties.keys(): + feed_version = ctx.node.properties["feed_version"] + else: + feed_version = "0.0" + if "feed_description" in ctx.node.properties.keys(): + feed_description = ctx.node.properties["feed_description"] + else: + feed_description = "No description provided" + if "aspr_classification" in ctx.node.properties.keys(): + aspr_classification = ctx.node.properties["aspr_classification"] + else: + aspr_classification = "unclassified" + + # Make the request to the controller + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to create feed name {0}".format(feed_name)) + f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER) + f.raise_for_status() + + # Capture important properties from the result + feed = f.json() + ctx.instance.runtime_properties["feed_id"] = feed["feedId"] + ctx.instance.runtime_properties["publish_url"] = feed["publishURL"] + ctx.instance.runtime_properties["log_url"] = feed["logURL"] + ctx.logger.info("Created feed name {0} with feed id {1}".format(feed_name, feed["feedId"])) + + except Exception as e: + ctx.logger.error("Error creating feed: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def get_existing_feed(**kwargs): + ''' + Find information for an existing data router feed + Expects "feed_id" to be set in node properties -- uniquely identifies the feed + Sets instance runtime properties: + - "feed_id" + - "publish_url" + - "log_url" + ''' + try: + # Make the lookup request to the controller + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + f = dmc.get_feed_info(ctx.node.properties["feed_id"]) + f.raise_for_status() + + # Capture important properties from the result + feed = f.json() + ctx.instance.runtime_properties["feed_id"] = ctx.node.properties["feed_id"] # Just to be consistent with newly-created node, above + ctx.instance.runtime_properties["publish_url"] = feed["publishURL"] + ctx.instance.runtime_properties["log_url"] = feed["logURL"] + ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"])) + + except Exception as e: + ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e)) + raise NonRecoverableError(e) + +@operation +def delete_feed(**kwargs): + ''' + Delete a feed + Expects "feed_id" to be set on the instance's runtime properties + ''' + try: + # Make the lookup request to the controllerid=ctx.node.properties["feed_id"] + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + f = dmc.delete_feed(ctx.instance.runtime_properties["feed_id"]) + f.raise_for_status() + ctx.logger.info("Deleting feed id {0}".format(ctx.instance.runtime_properties["feed_id"])) + + except Exception as e: + ctx.logger.error("Error deleting feed id {id}: {er}".format(id=ctx.instance.runtime_properties["feed_id"],er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py new file mode 100644 index 0000000..8796354 --- /dev/null +++ b/dmaap/dmaapplugin/dr_relationships.py @@ -0,0 +1,211 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST +from dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle +from consulif.consulif import ConsulHandle + +# Lifecycle operations for DMaaP Data Router +# publish and subscribe relationships + +@operation +def add_dr_publisher(**kwargs): + ''' + Sets up the source of the publishes_relationship as a publisher to the feed that + is the target of the relationship + Assumes target (the feed) has the following runtime properties set + - feed_id + - log_url + - publish_url + Assumes source (the publisher) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing one property: + - location (the dcaeLocationName to pass when adding the publisher to the feed) + Generates a user name and password that the publisher will need to use when publishing + Adds the following properties to the dictionary above: + - publish_url + - log_url + - username + - password + ''' + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + target_feed = ctx.target.node.id + ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed)) + + # Set up the parameters for the add_publisher request to the DMaaP bus controller + feed_id = ctx.target.instance.runtime_properties["feed_id"] + location = ctx.source.instance.runtime_properties[target_feed]["location"] + username = random_string(8) + password = random_string(10) + + # Make the request to add the publisher to the feed + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + add_pub = dmc.add_publisher(feed_id, location, username, password) + add_pub.raise_for_status() + publisher_info = add_pub.json() + publisher_id = publisher_info["pubId"] + ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password)) + + # Set runtime properties on the source + ctx.source.instance.runtime_properties[target_feed] = { + "publisher_id" : publisher_id, + "location" : location, + "publish_url" : ctx.target.instance.runtime_properties["publish_url"], + "log_url" : ctx.target.instance.runtime_properties["log_url"], + "username" : username, + "password" : password + } + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed]) + + except Exception as e: + ctx.logger.error("Error adding publisher to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + + +@operation +def delete_dr_publisher(**kwargs): + ''' + Deletes publisher (the source of the publishes_files relationship) + from the feed (the target of the relationship). + Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties, + when the publisher was added to the feed. + ''' + + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + # Get the publisher id + target_feed = ctx.target.node.id + publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"] + ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id, target_feed)) + + # Make the request + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + del_result = dmc.delete_publisher(publisher_id) + del_result.raise_for_status() + + ctx.logger.info("Deleted publisher {0}".format(publisher_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting publisher: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue + + +@operation +def add_dr_subscriber(**kwargs): + ''' + Sets up the source of the subscribes_to_files relationship as a subscriber to the + feed that is the target of the relationship. + Assumes target (the feed) has the following runtime property set + - feed_id + Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing the following properties: + - location (the dcaeLocationName to pass when adding the publisher to the feed) + - delivery_url (the URL to which data router will deliver files) + - username (the username data router will use when delivering files) + - password (the password data router will use when delivering files) + Adds a property to the dictionary above: + - subscriber_id (used to delete the subscriber in the uninstall workflow + ''' + try: + target_feed = ctx.target.node.id + ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed)) + + # Get the parameters for the call + feed_id = ctx.target.instance.runtime_properties["feed_id"] + location = ctx.source.instance.runtime_properties[target_feed]["location"] + delivery_url = ctx.source.instance.runtime_properties[target_feed]["delivery_url"] + username = ctx.source.instance.runtime_properties[target_feed]["username"] + password = ctx.source.instance.runtime_properties[target_feed]["password"] + + # Make the request to add the subscriber to the feed + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password) + add_sub.raise_for_status() + subscriber_info = add_sub.json() + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location)) + + # Add subscriber_id to the runtime properties + # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id + ctx.source.instance.runtime_properties[target_feed] = { + "subscriber_id": subscriber_id, + "location" : location, + "delivery_url" : delivery_url, + "username" : username, + "password" : password + } + ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed])) + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed]) + + except Exception as e: + ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + + +@operation +def delete_dr_subscriber(**kwargs): + ''' + Deletes subscriber (the source of the subscribes_to_files relationship) + from the feed (the target of the relationship). + Assumes that the source node's runtime properties dictionary for the target feed + includes 'subscriber_id', set when the publisher was added to the feed. + ''' + try: + # Get the subscriber id + target_feed = ctx.target.node.id + subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] + ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed)) + + # Make the request + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + del_result = dmc.delete_subscriber(subscriber_id) + del_result.raise_for_status() + + ctx.logger.info("Deleted subscriber {0}".format(subscriber_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting subscriber: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py new file mode 100644 index 0000000..16ad953 --- /dev/null +++ b/dmaap/dmaapplugin/mr_lifecycle.py @@ -0,0 +1,121 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER +from dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +# Lifecycle operations for DMaaP Message Router topics +@operation +def create_topic(**kwargs): + ''' + Creates a message router topic. + Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', + and 'global_mr_url' as optional node properties. If 'topic_name' is not set, + generates a random one. + Sets 'fqtn' in the instance runtime_properties. + Note that 'txenable' is a Message Router flag indicating whether transactions + are enabled on the topic. + ''' + try: + # Make sure there's a topic_name + if "topic_name" in ctx.node.properties: + topic_name = ctx.node.properties["topic_name"] + else: + topic_name = random_string(12) + + # Make sure there's a topic description + if "topic_description" in ctx.node.properties: + topic_description = ctx.node.properties["topic_description"] + else: + topic_description = "No description provided" + + # ..and the truly optional setting + if "txenable" in ctx.node.properties: + txenable = ctx.node.properties["txenable"] + else: + txenable= False + + if "replication_case" in ctx.node.properties: + replication_case = ctx.node.properties["replication_case"] + else: + replication_case = None + + if "global_mr_url" in ctx.node.properties: + global_mr_url = ctx.node.properties["global_mr_url"] + else: + global_mr_url = None + + + # Make the request to the controller + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to create topic name {0}".format(topic_name)) + t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url) + t.raise_for_status() + + # Capture important properties from the result + topic = t.json() + ctx.instance.runtime_properties["fqtn"] = topic["fqtn"] + + except Exception as e: + ctx.logger.error("Error creating topic: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def get_existing_topic(**kwargs): + ''' + Get data for an existing feed. + Expects 'fqtn' as a node property. + Copies this property to 'fqtn' in runtime properties for consistency + with a newly-created topic. + While there's no real need to make a call to the DMaaP bus controller, + we do so just to make sure the fqtn is known to the controller, so we + don't run into problems when we try to add a publisher or subscriber later. + ''' + try: + fqtn = ctx.node.properties["fqtn"] + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn)) + t = dmc.get_topic_info(fqtn) + t.raise_for_status() + + ctx.instance.runtime_properties["fqtn"] = ctx.node.properties["fqtn"] + + except Exception as e: + ctx.logger.error("Error getting existing topic: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def delete_topic(**kwargs): + ''' + Delete the topic. Expects the instance runtime property "fqtn" to have been + set when the topic was created. + ''' + try: + fqtn = ctx.instance.runtime_properties["fqtn"] + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to delete topic {0}".format(fqtn)) + t = dmc.delete_topic(fqtn) + t.raise_for_status() + + except Exception as e: + ctx.logger.error("Error getting existing topic: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue diff --git a/dmaap/dmaapplugin/mr_relationships.py b/dmaap/dmaapplugin/mr_relationships.py new file mode 100644 index 0000000..ff92d67 --- /dev/null +++ b/dmaap/dmaapplugin/mr_relationships.py @@ -0,0 +1,119 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle +from consulif.consulif import ConsulHandle + +# Message router relationship operations + +def _add_mr_client(ctype, actions): + ''' + Adds the node represented by 'source' as a client (publisher or subscriber) to + to topic represented by the 'target' node. The list of actions in 'actions' + determines whether the client is a subscriber or a publisher. + + Assumes target (the topic) has the following runtime property set + - fqtn + Assumes source (the client) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing the following properties: + - location (the dcaeLocationName to pass when adding the client to the topic) + - client_role (the AAF client role under which the client will access the topic) + Adds two properties to the dictionary above: + - topic_url (the URL that the client can use to access the topic) + - client_id (used to delete the client in the uninstall workflow) + ''' + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + target_topic = ctx.target.node.id # Key for the source's dictionary with topic-related info + fqtn = ctx.target.instance.runtime_properties["fqtn"] + ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn)) + + # Get the parameters needed for adding the client + location = ctx.source.instance.runtime_properties[target_topic]["location"] + client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"] + + # Make the request to add the client to the topic + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + c = dmc.create_client(fqtn, location, client_role, actions) + c.raise_for_status() + client_info = c.json() + client_id = client_info["mrClientId"] + topic_url = client_info["topicURL"] + + # Update source's runtime properties + #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url + #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id + ctx.source.instance.runtime_properties[target_topic] = { + "topic_url" : topic_url, + "client_id" : client_id, + "location" : location, + "client_role" : client_role + } + + ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location)) + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic]) + + except Exception as e: + ctx.logger.error("Error adding client to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def add_mr_publisher(**kwargs): + _add_mr_client("publisher", ["view", "pub"]) + +@operation +def add_mr_subscriber(**kwargs): + _add_mr_client("subscriber", ["view", "sub"]) + +@operation +def delete_mr_client(**kwargs): + ''' + Delete the client (publisher or subscriber). + Expect property 'client_id' to have been set in the instance's runtime_properties + when the client was created. + ''' + try: + target_topic = ctx.target.node.id + client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"] + ctx.logger.info("Attempting to delete client {0} ".format(client_id)) + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + c = dmc.delete_client(client_id) + c.raise_for_status() + + ctx.logger.info("Deleted client {0}".format(client_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting MR client: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue + |