summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--dmaap/dmaap.yaml8
-rw-r--r--dmaap/dmaapcontrollerif/dmaap_requests.py20
-rw-r--r--dmaap/dmaapplugin/dr_lifecycle.py25
-rw-r--r--dmaap/dmaapplugin/mr_lifecycle.py12
-rw-r--r--dmaap/setup.py2
6 files changed, 46 insertions, 22 deletions
diff --git a/.gitignore b/.gitignore
index 71d606c..1c61431 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,3 +12,4 @@ helm/nosetests.xml
**/*.wgn
**/*.egg-info
**/.testenv
+.vscode/*
diff --git a/dmaap/dmaap.yaml b/dmaap/dmaap.yaml
index ed56cf7..4a47a7f 100644
--- a/dmaap/dmaap.yaml
+++ b/dmaap/dmaap.yaml
@@ -25,7 +25,7 @@ plugins:
dmaapplugin:
executor: 'central_deployment_agent'
package_name: dmaap
- package_version: 1.3.4
+ package_version: 1.3.5
node_types:
@@ -47,6 +47,9 @@ node_types:
aspr_classification:
type: string
required: false
+ useExisting:
+ type: boolean
+ required: false
interfaces:
cloudify.interfaces.lifecycle:
@@ -115,6 +118,9 @@ node_types:
global_mr_url:
type: string
required: false
+ useExisting:
+ type: boolean
+ required: false
interfaces:
cloudify.interfaces.lifecycle:
diff --git a/dmaap/dmaapcontrollerif/dmaap_requests.py b/dmaap/dmaapcontrollerif/dmaap_requests.py
index 813a1d8..231953a 100644
--- a/dmaap/dmaapcontrollerif/dmaap_requests.py
+++ b/dmaap/dmaapcontrollerif/dmaap_requests.py
@@ -89,11 +89,11 @@ class DMaaPControllerHandle(object):
### PUBLIC API ###
# Data Router Feeds
- def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None):
+ def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None, useExisting=None):
'''
Create a DMaaP data router feed with the given feed name
and (optionally) feed version, feed description, ASPR classification,
- and owner
+ owner, and useExisting flag
'''
feed_definition = {'feedName' : name}
if version:
@@ -104,8 +104,11 @@ class DMaaPControllerHandle(object):
feed_definition['asprClassification'] = aspr_class
if owner:
feed_definition['owner'] = owner
+ feeds_path_query = self.feeds_path
+ if useExisting == True: # It's a boolean!
+ feeds_path_query += "?useExisting=true"
- return self._create_resource(self.feeds_path, feed_definition)
+ return self._create_resource(feeds_path_query, feed_definition)
def get_feed_info(self, feed_id):
'''
@@ -197,12 +200,12 @@ class DMaaPControllerHandle(object):
return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id))
# Message router topics
- def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None):
+ def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None, useExisting = None):
'''
Create a message router topic with the topic name 'name' and optionally the topic_description
- 'description', the 'txenable' flag and the topic owner 'owner'.
+ 'description', the 'txenable' flag, the 'useExisting' flag and the topic owner 'owner'.
'''
- topic_definition = {'topicName' : name};
+ topic_definition = {'topicName' : name}
if description:
topic_definition['topicDescription'] = description
if owner:
@@ -213,8 +216,11 @@ class DMaaPControllerHandle(object):
topic_definition['replicationCase'] = replication_case
if global_mr_url:
topic_definition['globalMrURL'] = global_mr_url
+ topics_path_query = self.topics_path
+ if useExisting == True: # It's a boolean!
+ topics_path_query += "?useExisting=true"
- return self._create_resource(self.topics_path, topic_definition)
+ return self._create_resource(topics_path_query, topic_definition)
def get_topic_info(self, fqtn):
'''
diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py
index 29811fa..718158a 100644
--- a/dmaap/dmaapplugin/dr_lifecycle.py
+++ b/dmaap/dmaapplugin/dr_lifecycle.py
@@ -31,9 +31,10 @@ def create_feed(**kwargs):
Create a new data router feed
Expects "feed_name" to be set in node properties
If 'feed_name' is not set or is empty, generates a random one.
- Allows "feed_version", "feed_description", and "aspr_classification" as optional properties
+ Allows "feed_version", "feed_description", "aspr_classification" and "useExisting" as optional properties
(Sets default values if not provided )
Sets instance runtime properties:
+ Note that 'useExisting' is a flag indicating whether DBCL will use existing feed if the feed already exists.
- "feed_id"
- "publish_url"
- "log_url"
@@ -58,11 +59,15 @@ def create_feed(**kwargs):
aspr_classification = ctx.node.properties["aspr_classification"]
else:
aspr_classification = "unclassified"
+ if "useExisting" in ctx.node.properties.keys():
+ useExisting = ctx.node.properties["useExisting"]
+ else:
+ useExisting = False
# Make the request to the controller
dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
ctx.logger.info("Attempting to create feed name {0}".format(feed_name))
- f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER)
+ f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER, useExisting)
f.raise_for_status()
# Capture important properties from the result
@@ -102,11 +107,11 @@ def get_existing_feed(**kwargs):
if f is None:
ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name))
raise ValueError("Not find existing feed with feed name " + feed_name)
- else:
+ else:
raise ValueError("Either feed_id or feed_name must be defined to get existing feed")
f.raise_for_status()
-
+
# Capture important properties from the result
feed = f.json()
feed_id = feed["feedId"]
@@ -118,17 +123,17 @@ def get_existing_feed(**kwargs):
else:
ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"]))
- except ValueError as e:
- ctx.logger.error("{er}".format(er=e))
+ except ValueError as e:
+ ctx.logger.error("{er}".format(er=e))
raise NonRecoverableError(e)
- except Exception as e:
+ except Exception as e:
if feed_id_input:
- ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+ ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
else:
- ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e))
+ ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e))
raise NonRecoverableError(e)
-
+
@operation
def delete_feed(**kwargs):
'''
diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py
index a4f04ec..ec674de 100644
--- a/dmaap/dmaapplugin/mr_lifecycle.py
+++ b/dmaap/dmaapplugin/mr_lifecycle.py
@@ -28,12 +28,14 @@ from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
def create_topic(**kwargs):
'''
Creates a message router topic.
- Allows 'topic_name', 'topic_description', 'txenable', 'replication_case',
- and 'global_mr_url' as optional node properties. If 'topic_name' is not set,
+ Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', 'global_mr_url',
+ and 'useExisting' as optional node properties. If 'topic_name' is not set,
generates a random one.
Sets 'fqtn' in the instance runtime_properties.
Note that 'txenable' is a Message Router flag indicating whether transactions
are enabled on the topic.
+ Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if
+ the topic already exists.
'''
try:
# Make sure there's a topic_name
@@ -66,11 +68,15 @@ def create_topic(**kwargs):
else:
global_mr_url = None
+ if "useExisting" in ctx.node.properties:
+ useExisting = ctx.node.properties["useExisting"]
+ else:
+ useExisting = False
# Make the request to the controller
dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
- t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url)
+ t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url, useExisting)
t.raise_for_status()
# Capture important properties from the result
diff --git a/dmaap/setup.py b/dmaap/setup.py
index 955f181..c423d95 100644
--- a/dmaap/setup.py
+++ b/dmaap/setup.py
@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name = "dmaap",
- version = "1.3.4",
+ version = "1.3.5",
packages=find_packages(),
author = "AT&T",
description = ("Cloudify plugin for creating DMaaP feeds and topics, and setting up publishers and subscribers."),