summaryrefslogtreecommitdiffstats
path: root/dmaap/dmaapplugin
diff options
context:
space:
mode:
authorJack Lucas <jflucas@research.att.com>2019-07-19 09:10:01 -0400
committerJack Lucas <jflucas@research.att.com>2019-07-19 15:26:46 -0400
commit556b233513db587f5d3982628dbca200fd3a9e2a (patch)
tree29aa301c39b7ce51bbbad972482db3b9ab730dee /dmaap/dmaapplugin
parent2aa97d09d9fc97a1562a3e3ac966bd48130086a4 (diff)
DMaaP add support for useExisting
Issue-ID: DCAEGEN2-1670 Signed-off-by: Jack Lucas <jflucas@research.att.com> Change-Id: I0a945117b7f4084191245ea56af2712b2cd20d4c
Diffstat (limited to 'dmaap/dmaapplugin')
-rw-r--r--dmaap/dmaapplugin/dr_lifecycle.py25
-rw-r--r--dmaap/dmaapplugin/mr_lifecycle.py12
2 files changed, 24 insertions, 13 deletions
diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py
index 29811fa..718158a 100644
--- a/dmaap/dmaapplugin/dr_lifecycle.py
+++ b/dmaap/dmaapplugin/dr_lifecycle.py
@@ -31,9 +31,10 @@ def create_feed(**kwargs):
Create a new data router feed
Expects "feed_name" to be set in node properties
If 'feed_name' is not set or is empty, generates a random one.
- Allows "feed_version", "feed_description", and "aspr_classification" as optional properties
+ Allows "feed_version", "feed_description", "aspr_classification" and "useExisting" as optional properties
(Sets default values if not provided )
Sets instance runtime properties:
+ Note that 'useExisting' is a flag indicating whether DBCL will use existing feed if the feed already exists.
- "feed_id"
- "publish_url"
- "log_url"
@@ -58,11 +59,15 @@ def create_feed(**kwargs):
aspr_classification = ctx.node.properties["aspr_classification"]
else:
aspr_classification = "unclassified"
+ if "useExisting" in ctx.node.properties.keys():
+ useExisting = ctx.node.properties["useExisting"]
+ else:
+ useExisting = False
# Make the request to the controller
dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
ctx.logger.info("Attempting to create feed name {0}".format(feed_name))
- f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER)
+ f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER, useExisting)
f.raise_for_status()
# Capture important properties from the result
@@ -102,11 +107,11 @@ def get_existing_feed(**kwargs):
if f is None:
ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name))
raise ValueError("Not find existing feed with feed name " + feed_name)
- else:
+ else:
raise ValueError("Either feed_id or feed_name must be defined to get existing feed")
f.raise_for_status()
-
+
# Capture important properties from the result
feed = f.json()
feed_id = feed["feedId"]
@@ -118,17 +123,17 @@ def get_existing_feed(**kwargs):
else:
ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"]))
- except ValueError as e:
- ctx.logger.error("{er}".format(er=e))
+ except ValueError as e:
+ ctx.logger.error("{er}".format(er=e))
raise NonRecoverableError(e)
- except Exception as e:
+ except Exception as e:
if feed_id_input:
- ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+ ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
else:
- ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e))
+ ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e))
raise NonRecoverableError(e)
-
+
@operation
def delete_feed(**kwargs):
'''
diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py
index a4f04ec..ec674de 100644
--- a/dmaap/dmaapplugin/mr_lifecycle.py
+++ b/dmaap/dmaapplugin/mr_lifecycle.py
@@ -28,12 +28,14 @@ from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
def create_topic(**kwargs):
'''
Creates a message router topic.
- Allows 'topic_name', 'topic_description', 'txenable', 'replication_case',
- and 'global_mr_url' as optional node properties. If 'topic_name' is not set,
+ Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', 'global_mr_url',
+ and 'useExisting' as optional node properties. If 'topic_name' is not set,
generates a random one.
Sets 'fqtn' in the instance runtime_properties.
Note that 'txenable' is a Message Router flag indicating whether transactions
are enabled on the topic.
+ Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if
+ the topic already exists.
'''
try:
# Make sure there's a topic_name
@@ -66,11 +68,15 @@ def create_topic(**kwargs):
else:
global_mr_url = None
+ if "useExisting" in ctx.node.properties:
+ useExisting = ctx.node.properties["useExisting"]
+ else:
+ useExisting = False
# Make the request to the controller
dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
- t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url)
+ t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url, useExisting)
t.raise_for_status()
# Capture important properties from the result