summaryrefslogtreecommitdiffstats
path: root/dmaap/dmaapplugin/dr_lifecycle.py
diff options
context:
space:
mode:
Diffstat (limited to 'dmaap/dmaapplugin/dr_lifecycle.py')
-rw-r--r--dmaap/dmaapplugin/dr_lifecycle.py44
1 files changed, 35 insertions, 9 deletions
diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py
index 45f8674..7473838 100644
--- a/dmaap/dmaapplugin/dr_lifecycle.py
+++ b/dmaap/dmaapplugin/dr_lifecycle.py
@@ -30,6 +30,7 @@ def create_feed(**kwargs):
'''
Create a new data router feed
Expects "feed_name" to be set in node properties
+ If 'feed_name' is not set or is empty, generates a random one.
Allows "feed_version", "feed_description", and "aspr_classification" as optional properties
(Sets default values if not provided )
Sets instance runtime properties:
@@ -40,9 +41,8 @@ def create_feed(**kwargs):
'''
try:
# Make sure there's a feed_name
- if "feed_name" in ctx.node.properties.keys():
- feed_name = ctx.node.properties["feed_name"]
- else:
+ feed_name = ctx.node.properties.get("feed_name")
+ if not (feed_name and feed_name.strip()):
feed_name = random_string(12)
# Set defaults/placeholders for the optional properties for the feed
@@ -76,6 +76,7 @@ def create_feed(**kwargs):
ctx.logger.error("Error creating feed: {er}".format(er=e))
raise NonRecoverableError(e)
+
@operation
def get_existing_feed(**kwargs):
'''
@@ -86,23 +87,48 @@ def get_existing_feed(**kwargs):
- "publish_url"
- "log_url"
'''
+
try:
# Make the lookup request to the controller
dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
- f = dmc.get_feed_info(ctx.node.properties["feed_id"])
- f.raise_for_status()
+ ctx.logger.info("DMaaPControllerHandle() returned")
+ feed_id_input = False
+ if "feed_id" in ctx.node.properties.keys():
+ feed_id_input = True
+ f = dmc.get_feed_info(ctx.node.properties["feed_id"])
+ elif "feed_name" in ctx.node.properties.keys():
+ feed_name = ctx.node.properties["feed_name"]
+ f = dmc.get_feed_info_by_name(feed_name)
+ if f is None:
+ ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name))
+ raise ValueError("Not find existing feed with feed name " + feed_name)
+ else:
+ raise ValueError("Either feed_id or feed_name must be defined to get existing feed")
+ f.raise_for_status()
+
# Capture important properties from the result
feed = f.json()
- ctx.instance.runtime_properties["feed_id"] = ctx.node.properties["feed_id"] # Just to be consistent with newly-created node, above
+ feed_id = feed["feedId"]
+ ctx.instance.runtime_properties["feed_id"] = feed_id # Just to be consistent with newly-created node, above
ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
ctx.instance.runtime_properties["log_url"] = feed["logURL"]
- ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"]))
+ if feed_id_input:
+ ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"]))
+ else:
+ ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"]))
- except Exception as e:
- ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+ except ValueError as e:
+ ctx.logger.error("{er}".format(er=e))
+ raise NonRecoverableError(e)
+ except Exception as e:
+ if feed_id_input:
+ ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+ else:
+ ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e))
raise NonRecoverableError(e)
+
@operation
def delete_feed(**kwargs):
'''