From 7e1efe3174336fa09a56c596af55ba93d7b14a91 Mon Sep 17 00:00:00 2001 From: "Hansen, Tony (th1395)" Date: Wed, 13 May 2020 18:55:54 +0000 Subject: move plugins from from ccsdk to dcaegen2 copy dmaap, helm, pgaas and sshkeyshare plugins from ccsdk to dcaegen2 Change-Id: Ib257495de6c275c45f0c87a4b42ac21a2fab7979 Signed-off-by: Hansen, Tony (th1395) Issue-ID: DCAEGEN2-2207 Signed-off-by: Hansen, Tony (th1395) --- dmaap/dmaapplugin/dr_relationships.py | 219 ++++++++++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 dmaap/dmaapplugin/dr_relationships.py (limited to 'dmaap/dmaapplugin/dr_relationships.py') diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py new file mode 100644 index 0000000..f1ff986 --- /dev/null +++ b/dmaap/dmaapplugin/dr_relationships.py @@ -0,0 +1,219 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST +from dmaapplugin.dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle +from consulif.consulif import ConsulHandle + +# Lifecycle operations for DMaaP Data Router +# publish and subscribe relationships + +@operation +def add_dr_publisher(**kwargs): + ''' + Sets up the source of the publishes_relationship as a publisher to the feed that + is the target of the relationship + Assumes target (the feed) has the following runtime properties set + - feed_id + - log_url + - publish_url + Assumes source (the publisher) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing one property: + - location (the dcaeLocationName to pass when adding the publisher to the feed) + Generates a user name and password that the publisher will need to use when publishing + Adds the following properties to the dictionary above: + - publish_url + - log_url + - username + - password + ''' + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + target_feed = ctx.target.node.id + ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed)) + + # Set up the parameters for the add_publisher request to the DMaaP bus controller + feed_id = ctx.target.instance.runtime_properties["feed_id"] + location = ctx.source.instance.runtime_properties[target_feed]["location"] + username = random_string(8) + password = random_string(16) + + # Make the request to add the publisher to the feed + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + add_pub = dmc.add_publisher(feed_id, location, username, password) + add_pub.raise_for_status() + publisher_info = add_pub.json() + publisher_id = publisher_info["pubId"] + ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password)) + + # Set runtime properties on the source + ctx.source.instance.runtime_properties[target_feed] = { + "publisher_id" : publisher_id, + "location" : location, + "publish_url" : ctx.target.instance.runtime_properties["publish_url"], + "log_url" : ctx.target.instance.runtime_properties["log_url"], + "username" : username, + "password" : password + } + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + cpy = dict(ctx.source.instance.runtime_properties[target_feed]) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy) + + except Exception as e: + ctx.logger.error("Error adding publisher to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + + +@operation +def delete_dr_publisher(**kwargs): + ''' + Deletes publisher (the source of the publishes_files relationship) + from the feed (the target of the relationship). + Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties, + when the publisher was added to the feed. + ''' + + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + # Get the publisher id + target_feed = ctx.target.node.id + publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"] + ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id)) + + # Make the request + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + del_result = dmc.delete_publisher(publisher_id) + del_result.raise_for_status() + + ctx.logger.info("Deleted publisher {0}".format(publisher_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting publisher: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue + + +@operation +def add_dr_subscriber(**kwargs): + ''' + Sets up the source of the subscribes_to_files relationship as a subscriber to the + feed that is the target of the relationship. + Assumes target (the feed) has the following runtime property set + - feed_id + Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing the following properties: + - location (the dcaeLocationName to pass when adding the publisher to the feed) + - delivery_url (the URL to which data router will deliver files) + - username (the username data router will use when delivering files) + - password (the password data router will use when delivering files) + Adds a property to the dictionary above: + - subscriber_id (used to delete the subscriber in the uninstall workflow + ''' + try: + target_feed = ctx.target.node.id + ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed)) + + # Get the parameters for the call + feed_id = ctx.target.instance.runtime_properties["feed_id"] + feed = ctx.source.instance.runtime_properties[target_feed] + location = feed["location"] + delivery_url = feed["delivery_url"] + username = feed["username"] + password = feed["password"] + decompress = feed["decompress"] if "decompress" in feed else False + privileged = feed["privileged"] if "privileged" in feed else False + + # Make the request to add the subscriber to the feed + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password, decompress, privileged) + add_sub.raise_for_status() + subscriber_info = add_sub.json() + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location)) + + # Add subscriber_id to the runtime properties + # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id + ctx.source.instance.runtime_properties[target_feed] = { + "subscriber_id": subscriber_id, + "location" : location, + "delivery_url" : delivery_url, + "username" : username, + "password" : password, + "decompress": decompress, + "privilegedSubscriber": privileged + } + ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed])) + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + cpy = dict(ctx.source.instance.runtime_properties[target_feed]) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy) + + except Exception as e: + ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + + +@operation +def delete_dr_subscriber(**kwargs): + ''' + Deletes subscriber (the source of the subscribes_to_files relationship) + from the feed (the target of the relationship). + Assumes that the source node's runtime properties dictionary for the target feed + includes 'subscriber_id', set when the publisher was added to the feed. + ''' + try: + # Get the subscriber id + target_feed = ctx.target.node.id + subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] + ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed)) + + # Make the request + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + del_result = dmc.delete_subscriber(subscriber_id) + del_result.raise_for_status() + + ctx.logger.info("Deleted subscriber {0}".format(subscriber_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting subscriber: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue -- cgit 1.2.3-korg