diff options
Diffstat (limited to 'dmaap/dmaapplugin')
-rw-r--r-- | dmaap/dmaapplugin/dr_lifecycle.py | 25 | ||||
-rw-r--r-- | dmaap/dmaapplugin/mr_lifecycle.py | 12 |
2 files changed, 24 insertions, 13 deletions
diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py index 29811fa..718158a 100644 --- a/dmaap/dmaapplugin/dr_lifecycle.py +++ b/dmaap/dmaapplugin/dr_lifecycle.py @@ -31,9 +31,10 @@ def create_feed(**kwargs): Create a new data router feed Expects "feed_name" to be set in node properties If 'feed_name' is not set or is empty, generates a random one. - Allows "feed_version", "feed_description", and "aspr_classification" as optional properties + Allows "feed_version", "feed_description", "aspr_classification" and "useExisting" as optional properties (Sets default values if not provided ) Sets instance runtime properties: + Note that 'useExisting' is a flag indicating whether DBCL will use existing feed if the feed already exists. - "feed_id" - "publish_url" - "log_url" @@ -58,11 +59,15 @@ def create_feed(**kwargs): aspr_classification = ctx.node.properties["aspr_classification"] else: aspr_classification = "unclassified" + if "useExisting" in ctx.node.properties.keys(): + useExisting = ctx.node.properties["useExisting"] + else: + useExisting = False # Make the request to the controller dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) ctx.logger.info("Attempting to create feed name {0}".format(feed_name)) - f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER) + f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER, useExisting) f.raise_for_status() # Capture important properties from the result @@ -102,11 +107,11 @@ def get_existing_feed(**kwargs): if f is None: ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name)) raise ValueError("Not find existing feed with feed name " + feed_name) - else: + else: raise ValueError("Either feed_id or feed_name must be defined to get existing feed") f.raise_for_status() - + # Capture important properties from the result feed = f.json() feed_id = feed["feedId"] @@ -118,17 +123,17 @@ def get_existing_feed(**kwargs): else: ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"])) - except ValueError as e: - ctx.logger.error("{er}".format(er=e)) + except ValueError as e: + ctx.logger.error("{er}".format(er=e)) raise NonRecoverableError(e) - except Exception as e: + except Exception as e: if feed_id_input: - ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e)) + ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e)) else: - ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e)) + ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e)) raise NonRecoverableError(e) - + @operation def delete_feed(**kwargs): ''' diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py index a4f04ec..ec674de 100644 --- a/dmaap/dmaapplugin/mr_lifecycle.py +++ b/dmaap/dmaapplugin/mr_lifecycle.py @@ -28,12 +28,14 @@ from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle def create_topic(**kwargs): ''' Creates a message router topic. - Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', - and 'global_mr_url' as optional node properties. If 'topic_name' is not set, + Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', 'global_mr_url', + and 'useExisting' as optional node properties. If 'topic_name' is not set, generates a random one. Sets 'fqtn' in the instance runtime_properties. Note that 'txenable' is a Message Router flag indicating whether transactions are enabled on the topic. + Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if + the topic already exists. ''' try: # Make sure there's a topic_name @@ -66,11 +68,15 @@ def create_topic(**kwargs): else: global_mr_url = None + if "useExisting" in ctx.node.properties: + useExisting = ctx.node.properties["useExisting"] + else: + useExisting = False # Make the request to the controller dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) ctx.logger.info("Attempting to create topic name {0}".format(topic_name)) - t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url) + t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url, useExisting) t.raise_for_status() # Capture important properties from the result |