summaryrefslogtreecommitdiffstats
path: root/dmaap/dmaapplugin/dr_relationships.py
diff options
context:
space:
mode:
Diffstat (limited to 'dmaap/dmaapplugin/dr_relationships.py')
-rw-r--r--dmaap/dmaapplugin/dr_relationships.py219
1 files changed, 219 insertions, 0 deletions
diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py
new file mode 100644
index 0000000..f1ff986
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_relationships.py
@@ -0,0 +1,219 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST
+from dmaapplugin.dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Lifecycle operations for DMaaP Data Router
+# publish and subscribe relationships
+
+@operation
+def add_dr_publisher(**kwargs):
+ '''
+ Sets up the source of the publishes_relationship as a publisher to the feed that
+ is the target of the relationship
+ Assumes target (the feed) has the following runtime properties set
+ - feed_id
+ - log_url
+ - publish_url
+ Assumes source (the publisher) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing one property:
+ - location (the dcaeLocationName to pass when adding the publisher to the feed)
+ Generates a user name and password that the publisher will need to use when publishing
+ Adds the following properties to the dictionary above:
+ - publish_url
+ - log_url
+ - username
+ - password
+ '''
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ target_feed = ctx.target.node.id
+ ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+ # Set up the parameters for the add_publisher request to the DMaaP bus controller
+ feed_id = ctx.target.instance.runtime_properties["feed_id"]
+ location = ctx.source.instance.runtime_properties[target_feed]["location"]
+ username = random_string(8)
+ password = random_string(16)
+
+ # Make the request to add the publisher to the feed
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ add_pub = dmc.add_publisher(feed_id, location, username, password)
+ add_pub.raise_for_status()
+ publisher_info = add_pub.json()
+ publisher_id = publisher_info["pubId"]
+ ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password))
+
+ # Set runtime properties on the source
+ ctx.source.instance.runtime_properties[target_feed] = {
+ "publisher_id" : publisher_id,
+ "location" : location,
+ "publish_url" : ctx.target.instance.runtime_properties["publish_url"],
+ "log_url" : ctx.target.instance.runtime_properties["log_url"],
+ "username" : username,
+ "password" : password
+ }
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ cpy = dict(ctx.source.instance.runtime_properties[target_feed])
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
+
+ except Exception as e:
+ ctx.logger.error("Error adding publisher to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_publisher(**kwargs):
+ '''
+ Deletes publisher (the source of the publishes_files relationship)
+ from the feed (the target of the relationship).
+ Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties,
+ when the publisher was added to the feed.
+ '''
+
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ # Get the publisher id
+ target_feed = ctx.target.node.id
+ publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"]
+ ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id))
+
+ # Make the request
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ del_result = dmc.delete_publisher(publisher_id)
+ del_result.raise_for_status()
+
+ ctx.logger.info("Deleted publisher {0}".format(publisher_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting publisher: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
+
+
+@operation
+def add_dr_subscriber(**kwargs):
+ '''
+ Sets up the source of the subscribes_to_files relationship as a subscriber to the
+ feed that is the target of the relationship.
+ Assumes target (the feed) has the following runtime property set
+ - feed_id
+ Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing the following properties:
+ - location (the dcaeLocationName to pass when adding the publisher to the feed)
+ - delivery_url (the URL to which data router will deliver files)
+ - username (the username data router will use when delivering files)
+ - password (the password data router will use when delivering files)
+ Adds a property to the dictionary above:
+ - subscriber_id (used to delete the subscriber in the uninstall workflow
+ '''
+ try:
+ target_feed = ctx.target.node.id
+ ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+ # Get the parameters for the call
+ feed_id = ctx.target.instance.runtime_properties["feed_id"]
+ feed = ctx.source.instance.runtime_properties[target_feed]
+ location = feed["location"]
+ delivery_url = feed["delivery_url"]
+ username = feed["username"]
+ password = feed["password"]
+ decompress = feed["decompress"] if "decompress" in feed else False
+ privileged = feed["privileged"] if "privileged" in feed else False
+
+ # Make the request to add the subscriber to the feed
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password, decompress, privileged)
+ add_sub.raise_for_status()
+ subscriber_info = add_sub.json()
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location))
+
+ # Add subscriber_id to the runtime properties
+ # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id
+ ctx.source.instance.runtime_properties[target_feed] = {
+ "subscriber_id": subscriber_id,
+ "location" : location,
+ "delivery_url" : delivery_url,
+ "username" : username,
+ "password" : password,
+ "decompress": decompress,
+ "privilegedSubscriber": privileged
+ }
+ ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed]))
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ cpy = dict(ctx.source.instance.runtime_properties[target_feed])
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
+
+ except Exception as e:
+ ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_subscriber(**kwargs):
+ '''
+ Deletes subscriber (the source of the subscribes_to_files relationship)
+ from the feed (the target of the relationship).
+ Assumes that the source node's runtime properties dictionary for the target feed
+ includes 'subscriber_id', set when the publisher was added to the feed.
+ '''
+ try:
+ # Get the subscriber id
+ target_feed = ctx.target.node.id
+ subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"]
+ ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed))
+
+ # Make the request
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ del_result = dmc.delete_subscriber(subscriber_id)
+ del_result.raise_for_status()
+
+ ctx.logger.info("Deleted subscriber {0}".format(subscriber_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting subscriber: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue