From 556b233513db587f5d3982628dbca200fd3a9e2a Mon Sep 17 00:00:00 2001 From: Jack Lucas Date: Fri, 19 Jul 2019 09:10:01 -0400 Subject: DMaaP add support for useExisting Issue-ID: DCAEGEN2-1670 Signed-off-by: Jack Lucas Change-Id: I0a945117b7f4084191245ea56af2712b2cd20d4c --- .gitignore | 1 + dmaap/dmaap.yaml | 8 +++++++- dmaap/dmaapcontrollerif/dmaap_requests.py | 20 +++++++++++++------- dmaap/dmaapplugin/dr_lifecycle.py | 25 +++++++++++++++---------- dmaap/dmaapplugin/mr_lifecycle.py | 12 +++++++++--- dmaap/setup.py | 2 +- 6 files changed, 46 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index 71d606c..1c61431 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ helm/nosetests.xml **/*.wgn **/*.egg-info **/.testenv +.vscode/* diff --git a/dmaap/dmaap.yaml b/dmaap/dmaap.yaml index ed56cf7..4a47a7f 100644 --- a/dmaap/dmaap.yaml +++ b/dmaap/dmaap.yaml @@ -25,7 +25,7 @@ plugins: dmaapplugin: executor: 'central_deployment_agent' package_name: dmaap - package_version: 1.3.4 + package_version: 1.3.5 node_types: @@ -47,6 +47,9 @@ node_types: aspr_classification: type: string required: false + useExisting: + type: boolean + required: false interfaces: cloudify.interfaces.lifecycle: @@ -115,6 +118,9 @@ node_types: global_mr_url: type: string required: false + useExisting: + type: boolean + required: false interfaces: cloudify.interfaces.lifecycle: diff --git a/dmaap/dmaapcontrollerif/dmaap_requests.py b/dmaap/dmaapcontrollerif/dmaap_requests.py index 813a1d8..231953a 100644 --- a/dmaap/dmaapcontrollerif/dmaap_requests.py +++ b/dmaap/dmaapcontrollerif/dmaap_requests.py @@ -89,11 +89,11 @@ class DMaaPControllerHandle(object): ### PUBLIC API ### # Data Router Feeds - def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None): + def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None, useExisting=None): ''' Create a DMaaP data router feed with the given feed name and (optionally) feed version, feed description, ASPR classification, - and owner + owner, and useExisting flag ''' feed_definition = {'feedName' : name} if version: @@ -104,8 +104,11 @@ class DMaaPControllerHandle(object): feed_definition['asprClassification'] = aspr_class if owner: feed_definition['owner'] = owner + feeds_path_query = self.feeds_path + if useExisting == True: # It's a boolean! + feeds_path_query += "?useExisting=true" - return self._create_resource(self.feeds_path, feed_definition) + return self._create_resource(feeds_path_query, feed_definition) def get_feed_info(self, feed_id): ''' @@ -197,12 +200,12 @@ class DMaaPControllerHandle(object): return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id)) # Message router topics - def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None): + def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None, useExisting = None): ''' Create a message router topic with the topic name 'name' and optionally the topic_description - 'description', the 'txenable' flag and the topic owner 'owner'. + 'description', the 'txenable' flag, the 'useExisting' flag and the topic owner 'owner'. ''' - topic_definition = {'topicName' : name}; + topic_definition = {'topicName' : name} if description: topic_definition['topicDescription'] = description if owner: @@ -213,8 +216,11 @@ class DMaaPControllerHandle(object): topic_definition['replicationCase'] = replication_case if global_mr_url: topic_definition['globalMrURL'] = global_mr_url + topics_path_query = self.topics_path + if useExisting == True: # It's a boolean! + topics_path_query += "?useExisting=true" - return self._create_resource(self.topics_path, topic_definition) + return self._create_resource(topics_path_query, topic_definition) def get_topic_info(self, fqtn): ''' diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py index 29811fa..718158a 100644 --- a/dmaap/dmaapplugin/dr_lifecycle.py +++ b/dmaap/dmaapplugin/dr_lifecycle.py @@ -31,9 +31,10 @@ def create_feed(**kwargs): Create a new data router feed Expects "feed_name" to be set in node properties If 'feed_name' is not set or is empty, generates a random one. - Allows "feed_version", "feed_description", and "aspr_classification" as optional properties + Allows "feed_version", "feed_description", "aspr_classification" and "useExisting" as optional properties (Sets default values if not provided ) Sets instance runtime properties: + Note that 'useExisting' is a flag indicating whether DBCL will use existing feed if the feed already exists. - "feed_id" - "publish_url" - "log_url" @@ -58,11 +59,15 @@ def create_feed(**kwargs): aspr_classification = ctx.node.properties["aspr_classification"] else: aspr_classification = "unclassified" + if "useExisting" in ctx.node.properties.keys(): + useExisting = ctx.node.properties["useExisting"] + else: + useExisting = False # Make the request to the controller dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) ctx.logger.info("Attempting to create feed name {0}".format(feed_name)) - f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER) + f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER, useExisting) f.raise_for_status() # Capture important properties from the result @@ -102,11 +107,11 @@ def get_existing_feed(**kwargs): if f is None: ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name)) raise ValueError("Not find existing feed with feed name " + feed_name) - else: + else: raise ValueError("Either feed_id or feed_name must be defined to get existing feed") f.raise_for_status() - + # Capture important properties from the result feed = f.json() feed_id = feed["feedId"] @@ -118,17 +123,17 @@ def get_existing_feed(**kwargs): else: ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"])) - except ValueError as e: - ctx.logger.error("{er}".format(er=e)) + except ValueError as e: + ctx.logger.error("{er}".format(er=e)) raise NonRecoverableError(e) - except Exception as e: + except Exception as e: if feed_id_input: - ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e)) + ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e)) else: - ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e)) + ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e)) raise NonRecoverableError(e) - + @operation def delete_feed(**kwargs): ''' diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py index a4f04ec..ec674de 100644 --- a/dmaap/dmaapplugin/mr_lifecycle.py +++ b/dmaap/dmaapplugin/mr_lifecycle.py @@ -28,12 +28,14 @@ from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle def create_topic(**kwargs): ''' Creates a message router topic. - Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', - and 'global_mr_url' as optional node properties. If 'topic_name' is not set, + Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', 'global_mr_url', + and 'useExisting' as optional node properties. If 'topic_name' is not set, generates a random one. Sets 'fqtn' in the instance runtime_properties. Note that 'txenable' is a Message Router flag indicating whether transactions are enabled on the topic. + Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if + the topic already exists. ''' try: # Make sure there's a topic_name @@ -66,11 +68,15 @@ def create_topic(**kwargs): else: global_mr_url = None + if "useExisting" in ctx.node.properties: + useExisting = ctx.node.properties["useExisting"] + else: + useExisting = False # Make the request to the controller dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) ctx.logger.info("Attempting to create topic name {0}".format(topic_name)) - t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url) + t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url, useExisting) t.raise_for_status() # Capture important properties from the result diff --git a/dmaap/setup.py b/dmaap/setup.py index 955f181..c423d95 100644 --- a/dmaap/setup.py +++ b/dmaap/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name = "dmaap", - version = "1.3.4", + version = "1.3.5", packages=find_packages(), author = "AT&T", description = ("Cloudify plugin for creating DMaaP feeds and topics, and setting up publishers and subscribers."), -- cgit 1.2.3-korg