summaryrefslogtreecommitdiffstats
path: root/dmaap/dmaapplugin
diff options
context:
space:
mode:
Diffstat (limited to 'dmaap/dmaapplugin')
-rw-r--r--dmaap/dmaapplugin/__init__.py82
-rw-r--r--dmaap/dmaapplugin/dmaaputils.py29
-rw-r--r--dmaap/dmaapplugin/dr_bridge.py199
-rw-r--r--dmaap/dmaapplugin/dr_lifecycle.py153
-rw-r--r--dmaap/dmaapplugin/dr_relationships.py219
-rw-r--r--dmaap/dmaapplugin/mr_lifecycle.py143
-rw-r--r--dmaap/dmaapplugin/mr_relationships.py119
7 files changed, 944 insertions, 0 deletions
diff --git a/dmaap/dmaapplugin/__init__.py b/dmaap/dmaapplugin/__init__.py
new file mode 100644
index 0000000..7a760d7
--- /dev/null
+++ b/dmaap/dmaapplugin/__init__.py
@@ -0,0 +1,82 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+## Get parameters for accessing the DMaaP controller
+from consulif.consulif import ConsulHandle
+from cloudify.exceptions import NonRecoverableError
+import os
+
+os.environ["REQUESTS_CA_BUNDLE"]="/opt/onap/certs/cacert.pem" # This is to handle https request thru plugin
+
+CONSUL_HOST = "consul" # Should always be a local consul agent on Cloudify Manager
+DBCL_KEY_NAME = "dmaap-plugin" # Consul key containing DMaaP data bus credentials
+# In the ONAP Kubernetes environment, bus controller address is always "dmaap-bc", on port 8080 (http) and 8443 (https)
+ONAP_SERVICE_ADDRESS = "dmaap-bc"
+HTTP_PORT = "8080"
+HTTPS_PORT = "8443"
+
+try:
+ _ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, None)
+except Exception as e:
+ raise NonRecoverableError("Error getting ConsulHandle when configuring dmaap plugin: {0}".format(e))
+
+try:
+ config = _ch.get_config(DBCL_KEY_NAME)
+except Exception as e:
+ raise NonRecoverableError("Error getting config for '{0}' from ConsulHandle when configuring dmaap plugin: {1}".format(DBCL_KEY_NAME, e))
+
+try:
+ DMAAP_USER = config['dmaap']['username']
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_USER while configuring dmaap plugin: {0}".format(e))
+
+try:
+ DMAAP_PASS = config['dmaap']['password']
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_PASS while configuring dmaap plugin: {0}".format(e))
+
+try:
+ DMAAP_OWNER = config['dmaap']['owner']
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_OWNER while configuring dmaap plugin: {0}".format(e))
+
+try:
+ if 'protocol' in config['dmaap']:
+ DMAAP_PROTOCOL = config['dmaap']['protocol']
+ service_port = HTTP_PORT
+ else:
+ DMAAP_PROTOCOL = 'https' # Default to https (service discovery should give us this but doesn't
+ service_port = HTTPS_PORT
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_PROTOCOL while configuring dmaap plugin: {0}".format(e))
+
+try:
+ if 'path' in config['dmaap']:
+ DMAAP_PATH = config['dmaap']['path']
+ else:
+ DMAAP_PATH = 'webapi' # SHould come from service discovery but Consul doesn't support it
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_PATH while configuring dmaap plugin: {0}".format(e))
+
+try:
+ service_address = ONAP_SERVICE_ADDRESS
+ DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH)
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_API_URL while configuring dmaap plugin: {0}".format(e))
+
diff --git a/dmaap/dmaapplugin/dmaaputils.py b/dmaap/dmaapplugin/dmaaputils.py
new file mode 100644
index 0000000..262abcb
--- /dev/null
+++ b/dmaap/dmaapplugin/dmaaputils.py
@@ -0,0 +1,29 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+# Utility functions
+
+import string
+from random import SystemRandom
+
+def random_string(n):
+ '''
+ Create a random alphanumeric string, n characters long.
+ '''
+ secureRandomGen = SystemRandom()
+ return ''.join(secureRandomGen.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for x in range(n))
diff --git a/dmaap/dmaapplugin/dr_bridge.py b/dmaap/dmaapplugin/dr_bridge.py
new file mode 100644
index 0000000..a188667
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_bridge.py
@@ -0,0 +1,199 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS
+from dmaapplugin.dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Set up a subscriber to a source feed
+def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw):
+ # Add subscriber to source feed
+ add_sub = dmc.add_subscriber(source_feed_id, loc, delivery_url, username, userpw)
+ add_sub.raise_for_status()
+ return add_sub.json()
+
+# Set up a publisher to a target feed
+def _set_up_publisher(dmc, target_feed_id, loc):
+ username = random_string(8)
+ userpw = random_string(16)
+ add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw)
+ add_pub.raise_for_status()
+ pub_info = add_pub.json()
+ return pub_info["pubId"], username, userpw
+
+# Get a central location to use when creating a publisher or subscriber
+def _get_central_location(dmc):
+ locations = dmc.get_dcae_central_locations()
+ if len(locations) < 1:
+ raise Exception('No central location found for setting up DR bridging')
+ return locations[0] # We take the first one. Typically there will be two central locations
+
+
+# Set up a "bridge" between two feeds internal to DCAE
+# A source feed "bridges_to" a target feed, meaning that anything published to
+# the source feed will be delivered to subscribers to the target feed (as well as
+# to subscribers of the source feed).
+#
+# The bridge is established by first adding a publisher to the target feed. The result of doing this
+# is a publish URL and a set of publication credentials.
+#The publish URL and publication credentials are used to set up a subscriber to the source feed.
+#I.e., we tell the source feed to deliver to an endpoint which is actually a publish
+# endpoint for the target feed.
+@operation
+def create_dr_bridge(**kwargs):
+
+ try:
+
+ # Get source and target feed ids
+ if 'feed_id' in ctx.target.instance.runtime_properties:
+ target_feed_id = ctx.target.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Target feed has no feed_id property')
+ if 'feed_id' in ctx.source.instance.runtime_properties:
+ source_feed_id = ctx.source.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Source feed has no feed_id property')
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ # Get a location to use when creating a publisher or subscriber--a central location seems reasonable
+ loc = _get_central_location(dmc)
+
+ ctx.logger.info('Creating bridge from feed {0} to feed {1} using location {2}'.format(source_feed_id, target_feed_id, loc))
+
+ # Add publisher to target feed
+ publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
+ ctx.logger.info("Added publisher id {0} to target feed {1} with user {2}".format(publisher_id, target_feed_id, username))
+
+ # Add subscriber to source feed
+ delivery_url = ctx.target.instance.runtime_properties['publish_url']
+ subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw)
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, delivery_url))
+
+ # Save the publisher and subscriber IDs on the source node, indexed by the target node id
+ ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "subscriber_id": subscriber_id}
+
+ except Exception as e:
+ ctx.logger.error("Error creating bridge: {0}".format(e))
+ raise NonRecoverableError(e)
+
+# Set up a bridge from an internal DCAE feed to a feed in an external Data Router system
+# The target feed needs to be provisioned in the external Data Router system. A publisher
+# to that feed must also be set up in the external Data Router system. The publish URL,
+# username, and password need to be captured in a target node of type dcae.nodes.ExternalTargetFeed.
+# The bridge is established by setting up a subscriber to the internal DCAE source feed using the
+# external feed publisher parameters as delivery parameters for the subscriber.
+@operation
+def create_external_dr_bridge(**kwargs):
+ try:
+
+ # Make sure target feed has full set of properties
+ if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties:
+ url = ctx.target.node.properties['url']
+ username = ctx.target.node.properties['username']
+ userpw = ctx.target.node.properties['userpw']
+ else:
+ raise Exception ("Target feed missing url, username, and/or user pw")
+
+ # Make sure source feed has a feed ID
+ if 'feed_id' in ctx.source.instance.runtime_properties:
+ source_feed_id = ctx.source.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Source feed has no feed_id property')
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ # Get a central location to use when creating subscriber
+ loc = _get_central_location(dmc)
+
+ ctx.logger.info('Creating external bridge from feed {0} to external url {1} using location {2}'.format(source_feed_id, url, loc))
+
+ # Create subscription to source feed using properties of the external target feed
+ subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, url, username, userpw)
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, url))
+
+ # Save the subscriber ID on the source node, indexed by the target node id
+ ctx.source.instance.runtime_properties[ctx.target.node.id] = {"subscriber_id": subscriber_id}
+
+ except Exception as e:
+ ctx.logger.error("Error creating external bridge: {0}".format(e))
+ raise NonRecoverableError(e)
+
+# Set up a bridge from a feed in an external Data Router system to an internal DCAE feed.
+# The bridge is established by creating a publisher on the internal DCAE feed. Then a subscription
+# to the external feed is created through manual provisioning in the external Data Router system, using
+# the publish URL and the publisher username and password for the internal feed as the delivery parameters
+# for the external subscription.
+# In order to obtain the publish URL, publisher username, and password, a blueprint using this sort of
+# bridge will typically have an output that exposes the runtime_property set on the source node in this operation.
+@operation
+def create_external_source_dr_bridge(**kwargs):
+ try:
+ # Get target feed id
+ if 'feed_id' in ctx.target.instance.runtime_properties:
+ target_feed_id = ctx.target.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Target feed has no feed_id property')
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ # Get a central location to use when creating a publisher
+ loc = _get_central_location(dmc)
+
+ # Create a publisher on the target feed
+ publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
+
+ # Save the publisher info on the source node, indexed by the target node
+ ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "url": ctx.target.instance.runtime_properties["publish_url"], "username": username, "userpw": userpw}
+
+ except Exception as e:
+ ctx.logger.error("Error creating external source bridge: {0}".format(e))
+
+# Remove the bridge between the relationship source and target.
+# For a bridge between 2 internal feeds, deletes the subscriber on the source feed and the publisher on the target feed.
+# For a bridge to an external target feed, deletes the subscriber on the source feed.
+# For a bridge from an external source feed, deletes the publisher on the target feed.
+@operation
+def remove_dr_bridge(**kwargs):
+ try:
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ if ctx.target.node.id in ctx.source.instance.runtime_properties:
+
+ if 'subscriber_id' in ctx.source.instance.runtime_properties[ctx.target.node.id]:
+ # Delete the subscription for this bridge
+ ctx.logger.info("Removing bridge -- deleting subscriber {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id']))
+ dmc.delete_subscriber(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id'])
+
+ if 'publisher_id' in ctx.source.instance.runtime_properties:
+ # Delete the publisher for this bridge
+ ctx.logger.info("Removing bridge -- deleting publisher {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id']))
+ dmc.delete_publisher(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id'])
+
+ ctx.logger.info("Remove bridge from {0} to {1}".format(ctx.source.node.id, ctx.target.node.id))
+
+ except Exception as e:
+ ctx.logger.error("Error removing bridge: {0}".format(e))
+ # Let the uninstall workflow proceed--don't throw a NonRecoverableError
diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py
new file mode 100644
index 0000000..af37977
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_lifecycle.py
@@ -0,0 +1,153 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
+from dmaapplugin.dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Lifecycle operations for DMaaP Data Router feeds
+
+@operation
+def create_feed(**kwargs):
+ '''
+ Create a new data router feed
+ Expects "feed_name" to be set in node properties
+ If 'feed_name' is not set or is empty, generates a random one.
+ Allows "feed_version", "feed_description", "aspr_classification" and "useExisting" as optional properties
+ (Sets default values if not provided )
+ Sets instance runtime properties:
+ Note that 'useExisting' is a flag indicating whether DBCL will use existing feed if the feed already exists.
+ - "feed_id"
+ - "publish_url"
+ - "log_url"
+
+ '''
+ try:
+ # Make sure there's a feed_name
+ feed_name = ctx.node.properties.get("feed_name")
+ if not (feed_name and feed_name.strip()):
+ feed_name = random_string(12)
+
+ # Set defaults/placeholders for the optional properties for the feed
+ if "feed_version" in ctx.node.properties:
+ feed_version = ctx.node.properties["feed_version"]
+ else:
+ feed_version = "0.0"
+ if "feed_description" in ctx.node.properties:
+ feed_description = ctx.node.properties["feed_description"]
+ else:
+ feed_description = "No description provided"
+ if "aspr_classification" in ctx.node.properties:
+ aspr_classification = ctx.node.properties["aspr_classification"]
+ else:
+ aspr_classification = "unclassified"
+ if "useExisting" in ctx.node.properties:
+ useExisting = ctx.node.properties["useExisting"]
+ else:
+ useExisting = False
+
+ # Make the request to the controller
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to create feed name {0}".format(feed_name))
+ f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER, useExisting)
+ f.raise_for_status()
+
+ # Capture important properties from the result
+ feed = f.json()
+ ctx.instance.runtime_properties["feed_id"] = feed["feedId"]
+ ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
+ ctx.instance.runtime_properties["log_url"] = feed["logURL"]
+ ctx.logger.info("Created feed name {0} with feed id {1}".format(feed_name, feed["feedId"]))
+
+ except Exception as e:
+ ctx.logger.error("Error creating feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def get_existing_feed(**kwargs):
+ '''
+ Find information for an existing data router feed
+ Expects "feed_id" to be set in node properties -- uniquely identifies the feed
+ Sets instance runtime properties:
+ - "feed_id"
+ - "publish_url"
+ - "log_url"
+ '''
+
+ try:
+ # Make the lookup request to the controller
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("DMaaPControllerHandle() returned")
+ feed_id_input = False
+ if "feed_id" in ctx.node.properties:
+ feed_id_input = True
+ f = dmc.get_feed_info(ctx.node.properties["feed_id"])
+ elif "feed_name" in ctx.node.properties:
+ feed_name = ctx.node.properties["feed_name"]
+ f = dmc.get_feed_info_by_name(feed_name)
+ if f is None:
+ ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name))
+ raise ValueError("Not find existing feed with feed name " + feed_name)
+ else:
+ raise ValueError("Either feed_id or feed_name must be defined to get existing feed")
+
+ f.raise_for_status()
+
+ # Capture important properties from the result
+ feed = f.json()
+ feed_id = feed["feedId"]
+ ctx.instance.runtime_properties["feed_id"] = feed_id # Just to be consistent with newly-created node, above
+ ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
+ ctx.instance.runtime_properties["log_url"] = feed["logURL"]
+ if feed_id_input:
+ ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"]))
+ else:
+ ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"]))
+
+ except ValueError as e:
+ ctx.logger.error("{er}".format(er=e))
+ raise NonRecoverableError(e)
+ except Exception as e:
+ if feed_id_input:
+ ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+ else:
+ ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_feed(**kwargs):
+ '''
+ Delete a feed
+ Expects "feed_id" to be set on the instance's runtime properties
+ '''
+ try:
+ # Make the lookup request to the controllerid=ctx.node.properties["feed_id"]
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ f = dmc.delete_feed(ctx.instance.runtime_properties["feed_id"])
+ f.raise_for_status()
+ ctx.logger.info("Deleting feed id {0}".format(ctx.instance.runtime_properties["feed_id"]))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting feed id {id}: {er}".format(id=ctx.instance.runtime_properties["feed_id"],er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py
new file mode 100644
index 0000000..f1ff986
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_relationships.py
@@ -0,0 +1,219 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST
+from dmaapplugin.dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Lifecycle operations for DMaaP Data Router
+# publish and subscribe relationships
+
+@operation
+def add_dr_publisher(**kwargs):
+ '''
+ Sets up the source of the publishes_relationship as a publisher to the feed that
+ is the target of the relationship
+ Assumes target (the feed) has the following runtime properties set
+ - feed_id
+ - log_url
+ - publish_url
+ Assumes source (the publisher) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing one property:
+ - location (the dcaeLocationName to pass when adding the publisher to the feed)
+ Generates a user name and password that the publisher will need to use when publishing
+ Adds the following properties to the dictionary above:
+ - publish_url
+ - log_url
+ - username
+ - password
+ '''
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ target_feed = ctx.target.node.id
+ ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+ # Set up the parameters for the add_publisher request to the DMaaP bus controller
+ feed_id = ctx.target.instance.runtime_properties["feed_id"]
+ location = ctx.source.instance.runtime_properties[target_feed]["location"]
+ username = random_string(8)
+ password = random_string(16)
+
+ # Make the request to add the publisher to the feed
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ add_pub = dmc.add_publisher(feed_id, location, username, password)
+ add_pub.raise_for_status()
+ publisher_info = add_pub.json()
+ publisher_id = publisher_info["pubId"]
+ ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password))
+
+ # Set runtime properties on the source
+ ctx.source.instance.runtime_properties[target_feed] = {
+ "publisher_id" : publisher_id,
+ "location" : location,
+ "publish_url" : ctx.target.instance.runtime_properties["publish_url"],
+ "log_url" : ctx.target.instance.runtime_properties["log_url"],
+ "username" : username,
+ "password" : password
+ }
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ cpy = dict(ctx.source.instance.runtime_properties[target_feed])
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
+
+ except Exception as e:
+ ctx.logger.error("Error adding publisher to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_publisher(**kwargs):
+ '''
+ Deletes publisher (the source of the publishes_files relationship)
+ from the feed (the target of the relationship).
+ Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties,
+ when the publisher was added to the feed.
+ '''
+
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ # Get the publisher id
+ target_feed = ctx.target.node.id
+ publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"]
+ ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id))
+
+ # Make the request
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ del_result = dmc.delete_publisher(publisher_id)
+ del_result.raise_for_status()
+
+ ctx.logger.info("Deleted publisher {0}".format(publisher_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting publisher: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
+
+
+@operation
+def add_dr_subscriber(**kwargs):
+ '''
+ Sets up the source of the subscribes_to_files relationship as a subscriber to the
+ feed that is the target of the relationship.
+ Assumes target (the feed) has the following runtime property set
+ - feed_id
+ Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing the following properties:
+ - location (the dcaeLocationName to pass when adding the publisher to the feed)
+ - delivery_url (the URL to which data router will deliver files)
+ - username (the username data router will use when delivering files)
+ - password (the password data router will use when delivering files)
+ Adds a property to the dictionary above:
+ - subscriber_id (used to delete the subscriber in the uninstall workflow
+ '''
+ try:
+ target_feed = ctx.target.node.id
+ ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+ # Get the parameters for the call
+ feed_id = ctx.target.instance.runtime_properties["feed_id"]
+ feed = ctx.source.instance.runtime_properties[target_feed]
+ location = feed["location"]
+ delivery_url = feed["delivery_url"]
+ username = feed["username"]
+ password = feed["password"]
+ decompress = feed["decompress"] if "decompress" in feed else False
+ privileged = feed["privileged"] if "privileged" in feed else False
+
+ # Make the request to add the subscriber to the feed
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password, decompress, privileged)
+ add_sub.raise_for_status()
+ subscriber_info = add_sub.json()
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location))
+
+ # Add subscriber_id to the runtime properties
+ # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id
+ ctx.source.instance.runtime_properties[target_feed] = {
+ "subscriber_id": subscriber_id,
+ "location" : location,
+ "delivery_url" : delivery_url,
+ "username" : username,
+ "password" : password,
+ "decompress": decompress,
+ "privilegedSubscriber": privileged
+ }
+ ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed]))
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ cpy = dict(ctx.source.instance.runtime_properties[target_feed])
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
+
+ except Exception as e:
+ ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_subscriber(**kwargs):
+ '''
+ Deletes subscriber (the source of the subscribes_to_files relationship)
+ from the feed (the target of the relationship).
+ Assumes that the source node's runtime properties dictionary for the target feed
+ includes 'subscriber_id', set when the publisher was added to the feed.
+ '''
+ try:
+ # Get the subscriber id
+ target_feed = ctx.target.node.id
+ subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"]
+ ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed))
+
+ # Make the request
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ del_result = dmc.delete_subscriber(subscriber_id)
+ del_result.raise_for_status()
+
+ ctx.logger.info("Deleted subscriber {0}".format(subscriber_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting subscriber: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py
new file mode 100644
index 0000000..6fe3023
--- /dev/null
+++ b/dmaap/dmaapplugin/mr_lifecycle.py
@@ -0,0 +1,143 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
+from dmaapplugin.dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Lifecycle operations for DMaaP Message Router topics
+@operation
+def create_topic(**kwargs):
+ '''
+ Creates a message router topic.
+ Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', 'global_mr_url',
+ and 'useExisting' as optional node properties. If 'topic_name' is not set,
+ generates a random one.
+ Sets 'fqtn' in the instance runtime_properties.
+ Note that 'txenable' is a Message Router flag indicating whether transactions
+ are enabled on the topic.
+ Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if
+ the topic already exists.
+ '''
+ try:
+ # Make sure there's a topic_name
+ if "topic_name" in ctx.node.properties:
+ topic_name = ctx.node.properties["topic_name"]
+ if topic_name == '' or topic_name.isspace():
+ topic_name = random_string(12)
+ else:
+ topic_name = random_string(12)
+
+ # Make sure there's a topic description
+ if "topic_description" in ctx.node.properties:
+ topic_description = ctx.node.properties["topic_description"]
+ else:
+ topic_description = "No description provided"
+
+ # ..and the truly optional setting
+ if "txenable" in ctx.node.properties:
+ txenable = ctx.node.properties["txenable"]
+ else:
+ txenable= False
+
+ if "replication_case" in ctx.node.properties:
+ replication_case = ctx.node.properties["replication_case"]
+ else:
+ replication_case = None
+
+ if "global_mr_url" in ctx.node.properties:
+ global_mr_url = ctx.node.properties["global_mr_url"]
+ else:
+ global_mr_url = None
+
+ if "useExisting" in ctx.node.properties:
+ useExisting = ctx.node.properties["useExisting"]
+ else:
+ useExisting = False
+
+ # Make the request to the controller
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
+ t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url, useExisting)
+ t.raise_for_status()
+
+ # Capture important properties from the result
+ topic = t.json()
+ ctx.instance.runtime_properties["fqtn"] = topic["fqtn"]
+
+ except Exception as e:
+ ctx.logger.error("Error creating topic: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def get_existing_topic(**kwargs):
+ '''
+ Get data for an existing topic.
+ Expects 'fqtn' as a node property.
+ Copies this property to 'fqtn' in runtime properties for consistency
+ with a newly-created topic.
+ While there's no real need to make a call to the DMaaP bus controller,
+ we do so just to make sure the fqtn is known to the controller, so we
+ don't run into problems when we try to add a publisher or subscriber later.
+ '''
+ try:
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ fqtn_input = False
+ if "fqtn" in ctx.node.properties:
+ fqtn = ctx.node.properties["fqtn"]
+ fqtn_input = True
+ elif "topic_name" in ctx.node.properties:
+ topic_name = ctx.node.properties["topic_name"]
+ ctx.logger.info("Attempting to get fqtn for existing topic {0}".format(topic_name))
+ fqtn = dmc.get_topic_fqtn_by_name(topic_name)
+ if fqtn is None:
+ raise ValueError("Not find existing topic with name " + topic_name)
+ else:
+ ctx.logger.error("Not find existing topic with name {0}".format(topic_name))
+ raise ValueError("Either fqtn or topic_name must be defined to get existing topic")
+
+ ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn))
+ t = dmc.get_topic_info(fqtn)
+ t.raise_for_status()
+
+ ctx.instance.runtime_properties["fqtn"] = fqtn
+
+ except Exception as e:
+ ctx.logger.error("Error getting existing topic: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def delete_topic(**kwargs):
+ '''
+ Delete the topic. Expects the instance runtime property "fqtn" to have been
+ set when the topic was created.
+ '''
+ try:
+ fqtn = ctx.instance.runtime_properties["fqtn"]
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to delete topic {0}".format(fqtn))
+ t = dmc.delete_topic(fqtn)
+ t.raise_for_status()
+
+ except Exception as e:
+ ctx.logger.error("Error getting existing topic: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/mr_relationships.py b/dmaap/dmaapplugin/mr_relationships.py
new file mode 100644
index 0000000..34d02e2
--- /dev/null
+++ b/dmaap/dmaapplugin/mr_relationships.py
@@ -0,0 +1,119 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Message router relationship operations
+
+def _add_mr_client(ctype, actions):
+ '''
+ Adds the node represented by 'source' as a client (publisher or subscriber) to
+ to topic represented by the 'target' node. The list of actions in 'actions'
+ determines whether the client is a subscriber or a publisher.
+
+ Assumes target (the topic) has the following runtime property set
+ - fqtn
+ Assumes source (the client) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing the following properties:
+ - location (the dcaeLocationName to pass when adding the client to the topic)
+ - client_role (the AAF client role under which the client will access the topic)
+ Adds two properties to the dictionary above:
+ - topic_url (the URL that the client can use to access the topic)
+ - client_id (used to delete the client in the uninstall workflow)
+ '''
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ target_topic = ctx.target.node.id # Key for the source's dictionary with topic-related info
+ fqtn = ctx.target.instance.runtime_properties["fqtn"]
+ ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn))
+
+ # Get the parameters needed for adding the client
+ location = ctx.source.instance.runtime_properties[target_topic]["location"]
+ client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"]
+
+ # Make the request to add the client to the topic
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ c = dmc.create_client(fqtn, location, client_role, actions)
+ c.raise_for_status()
+ client_info = c.json()
+ client_id = client_info["mrClientId"]
+ topic_url = client_info["topicURL"]
+
+ # Update source's runtime properties
+ #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url
+ #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id
+ ctx.source.instance.runtime_properties[target_topic] = {
+ "topic_url" : topic_url,
+ "client_id" : client_id,
+ "location" : location,
+ "client_role" : client_role
+ }
+
+ ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location))
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic])
+
+ except Exception as e:
+ ctx.logger.error("Error adding client to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def add_mr_publisher(**kwargs):
+ _add_mr_client("publisher", ["view", "pub"])
+
+@operation
+def add_mr_subscriber(**kwargs):
+ _add_mr_client("subscriber", ["view", "sub"])
+
+@operation
+def delete_mr_client(**kwargs):
+ '''
+ Delete the client (publisher or subscriber).
+ Expect property 'client_id' to have been set in the instance's runtime_properties
+ when the client was created.
+ '''
+ try:
+ target_topic = ctx.target.node.id
+ client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"]
+ ctx.logger.info("Attempting to delete client {0} ".format(client_id))
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ c = dmc.delete_client(client_id)
+ c.raise_for_status()
+
+ ctx.logger.info("Deleted client {0}".format(client_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting MR client: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
+