summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJack Lucas <jflucas@research.att.com>2017-09-01 13:48:08 +0000
committerJack Lucas <jflucas@research.att.com>2017-09-01 13:50:18 +0000
commit0619cb3772a12e0a5a3aecc09b96eeb7df20a000 (patch)
treeb8a921e0e41578163f7c2db41450301c374361df
parentac3f029edc6bc8b25ab44666b5408c8929e6ba92 (diff)
Add seed code for DMaaP plugin
Change-Id: I8c7a9c432badd3052a571ed87b9b580760b376e6 Issue-Id: CCSDK-65 Signed-off-by: Jack Lucas <jflucas@research.att.com>
-rw-r--r--dmaap/.gitignore69
-rw-r--r--dmaap/LICENSE.txt17
-rw-r--r--dmaap/README.md322
-rw-r--r--dmaap/consulif/__init__.py0
-rw-r--r--dmaap/consulif/consulif.py120
-rw-r--r--dmaap/dmaap.yaml193
-rw-r--r--dmaap/dmaapcontrollerif/__init__.py1
-rw-r--r--dmaap/dmaapcontrollerif/dmaap_requests.py256
-rw-r--r--dmaap/dmaapplugin/__init__.py46
-rw-r--r--dmaap/dmaapplugin/dmaaputils.py28
-rw-r--r--dmaap/dmaapplugin/dr_bridge.py198
-rw-r--r--dmaap/dmaapplugin/dr_lifecycle.py121
-rw-r--r--dmaap/dmaapplugin/dr_relationships.py211
-rw-r--r--dmaap/dmaapplugin/mr_lifecycle.py121
-rw-r--r--dmaap/dmaapplugin/mr_relationships.py119
-rw-r--r--dmaap/requirements.txt1
-rw-r--r--dmaap/setup.py16
-rw-r--r--dmaap/tests/test_plugin.py26
-rw-r--r--dmaap/tox.ini26
19 files changed, 1891 insertions, 0 deletions
diff --git a/dmaap/.gitignore b/dmaap/.gitignore
new file mode 100644
index 0000000..af8e550
--- /dev/null
+++ b/dmaap/.gitignore
@@ -0,0 +1,69 @@
+.project
+.pydevproject
+wheels
+venv
+cdap.zip
+docker.zip
+*.wgn
+*.swp
+*.swn
+*.swo
+.DS_Store
+
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*,cover
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
diff --git a/dmaap/LICENSE.txt b/dmaap/LICENSE.txt
new file mode 100644
index 0000000..f90f8f1
--- /dev/null
+++ b/dmaap/LICENSE.txt
@@ -0,0 +1,17 @@
+============LICENSE_START=======================================================
+org.onap.ccsdk
+================================================================================
+Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
diff --git a/dmaap/README.md b/dmaap/README.md
new file mode 100644
index 0000000..646400f
--- /dev/null
+++ b/dmaap/README.md
@@ -0,0 +1,322 @@
+## Cloudify DMaaP Plugin
+Cloudify plugin for creating and managing DMaaP Data Router feeds and subscriptions and
+DMaaP Message Router topics. The plugin uses the DMaaP Bus Controller API.
+
+### Plugin Support for DMaaP Data Router
+#### Plugin Types for DMaaP Data Router
+The Cloudify type definitions for DMaaP Data Router nodes and relationships
+are defined in [`dmaap.yaml`](./dmaap.yaml).
+
+There are four node types for DMaaP Data Router:
+
+- `ccsdk.nodes.Feed`: This type represents a feed that does not yet
+exist and that should be created when the install workflow is
+run against a blueprint that contains a node of this type.
+
+Property|Type|Required?|Description |
+--------|----|---------|---------------------------------------
+feed_name|string|no|a name that identifies the feed (plugin will generate if absent)
+feed_version|string|no|version number for the feed (feed_name + feed_version uniquely identify the feed in DR)
+feed_description|string|no|human-readable description of the feed
+aspr_classification|string|no|AT&T ASPR classification of the feed
+
+
+- `ccsdk.nodes.ExistingFeed`: This type represents a feed that
+already exists. Nodes of this type are placed in a blueprint so
+that other nodes in the blueprint can be set up as publishers or
+subscribers to the feed. The table below shows the properties that a node
+of this type may have.
+
+Property|Type|Required?|Description
+--------|----|---------|----------------------------------------
+feed_id|string|yes|Feed identifier assigned by DMaaP when the feed was created
+
+- `ccsdk.nodes.ExternalTargetFeed`: This type represents a feed created in an external DMaaP
+environment (i.e., an environment that the plugin cannot access to make provisioning requests, such as
+a shared corporate system). Nodes of this type are placed in a blueprint so that other feed nodes of
+type `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed` can be set up to "bridge" to external feeds by
+publishing data to the external feeds. The table below shows the properties that a node of this type
+may have.
+
+Property|Type|Required?|Description
+--------|----|---------|----------------------------------------
+url|string|yes|The publish URL of the external feed.
+username|string|yes|The username to be used when delivering to the external feed
+userpw|string|yes|The password to be used when delivering to the external feed
+
+_Note: These properties are usually obtained by manually creating a feed in the external
+DMaaP DR system and then creating a publisher for that feed._
+
+- `ccsdk.nodes.ExternalSourceFeed`: This type represents a feed created in an external DMaaP
+environment (i.e., an environment that the plugin cannot access to makes provisioning requests, such as
+a shared corporate system). Nodes of this type are place in a blueprint so that they can be set up to
+"bridge" to other feed nodes of type `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed`. This type
+has no node properties, but when a bridge is set up, the url, username, and password are attached to the
+node as runtime_properties, using the name of the target feed node as the top-level key.
+
+There are five relationship types for DMaaP Data Router:
+
+- `ccsdk.relationships.publish_files`, used to
+indicate that the relationship's source node sends is a publisher to the
+Data Router feed represented by the relationship's target node.
+- `ccsdk.relationships.subscribe_to_files`, used to
+indicate that the relationship's source node is a subscriber to the
+Data Router feed represented by the relationship's target node.
+- `ccsdk.relationships.bridges_to`, used to indicate that the relationship's source
+node (a `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed`) should be set up
+to forward data ("bridge") to the relationship's target feed (another `ccsdk.nodes.Feed` or
+`ccsdk.nodes.ExistingFeed`).
+- `ccsdk.relationships.bridges_to_external`, used to indicate that the relationship's source
+node (a `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed`) should be set up
+to forward data ("bridge") to the relationship's target node (a feed in an external DMaaP system,
+represented by a `ccsdk.nodes.ExternalTargetFeed` node).
+- `ccsdk.relationships.bridges_from_external_to_internal`, used to indicate the the relationship's source
+node (a feed in an external DMaaP system, represented by a `ccsdk.nodes.ExternalSourceFeed` node) should be set up to forward date ("bridge")
+to the relationship's target node (an internal ONAP feed, represented by a `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed` node).
+
+The plugin code implements the lifecycle operations needed to create and
+delete feeds and to add and remove publishers and subscribers. It also implements
+the operations needed to set up bridging between feeds.
+
+#### Interaction with Other Plugins
+When creating a new feed or processing a reference to an existing feed,
+the plugin operates independently of other plugins.
+
+When processing a `ccsdk.relationships.publish_files` relationship or a
+`ccsdk.relationships.subscribe_to_files` relationship, this plugin needs
+to obtain data from the source node and, in the case of `publish_files`, provide
+data to the source node. Certain conventions are therefore needed for
+passing data between this plugin and the plugins responsible for the source
+nodes in these relationships. In Cloudify, the mechanism for
+sharing data among plugins is the `ctx.instance.runtime_properties` dictionary
+associated with each node.
+
+A given source node may have relationships with several feeds. For example, an ONAP DCAE
+data collector might publish two different types of data to two different feeds. An ONAP DCAE
+analytics module might subscribe to one feed to get input for its processing and
+publish its results to a different feed. When this DMaaP plugin and the plugin for the
+source node exchange information, they need to do in a way that lets them distinguish
+among different feeds. We do this through a simple convention: for each source node
+to feed relationship, the source node plugin will create a property in the source node's
+`runtime_properties` dictionary. The name of the property will be the same as the
+name of the target node of the relationship. For instance, if a node has a
+`publishes_files` relationship with a target node named `feed00`, then the plugin that's
+responsible for managing the source node with create an entry in the source node's
+`runtime_properties` dictionary named `feed00`. This entry itself will be a dictionary.
+
+The content of this data exchange dictionary depends on whether the source node is a
+publisher (i.e., the relationship is `publish_files`) or a subscriber (i.e., the
+relationship is `subscribe_to_files`).
+
+For the `publish_files` relationship, the data exchange dictionary has the following
+properties:
+
+Property|Set by|Description
+--------|------|------------------------------------------------
+location|source node plugin|the DMaaP location for the publisher, used to set up routing
+publish_url|DMaaP plugin|the URL to which the publisher makes Data Router publish requests
+log_url|DMaaP plugin|the URL from which log data for the feed can be obtained
+username|DMaaP plugin|the username (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router
+password|DMaaP plugin|the password (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router
+
+For the `subscribe_to_files` relationship, the data exchange dictionary has the following
+properties:
+
+Property|Set by|Description
+--------|------|------------------------------------------------
+location|source node plugin|the DMaaP location for the subscriber, used to set up routing
+delivery_url|source node plugin|the URL to which the Data Router should deliver files
+username|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering files
+password|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering file
+
+### Plugin Support for DMaaP Message Router
+#### Plugin Types for DMaaP Message Router
+The Cloudify type definitions for DMaaP Message Router nodes and relationships
+are defined in [`dmaap.yaml`](./dmaap.yaml).
+
+There are two node types for DMaaP Message Router:
+
+- `ccsdk.nodes.Topic`: This type represents a topic that does not yet
+exist and that should be created when the install workflow is
+run against a blueprint that contains a node of this type.
+
+Property|Type|Required?|Description
+--------|----|---------|---------------------------------------
+topic_name|string|no|a name that uniquely identifies the feed (plugin will generate if absent)
+topic_description|string|no|human-readable description of the feed
+txenable|boolean|no|flag indicating whether transactions are enabled for this topic
+replication_case|string|no|type of replication required for the topic (defaults to no replication)
+global_mr_url|string|no|Global MR host name for replication to a global MR instance
+
+Note: In order to set up topics, a user should be familiar with message router and how it is configured,
+and this README is not the place to explain the details of message router. Here are a couple of pieces of
+information that might be helpful.
+Currently, the allowed values for `replication_case` are:
+
+- `REPLICATION_NONE`
+- `REPLICATION_EDGE_TO_CENTRAL`
+- `REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL`
+- `REPLICATION_CENTRAL_TO_EDGE`
+- `REPLICATION_CENTRAL_TO_GLOBAL`
+- `REPLICATION_GLOBAL_TO_CENTRAL`
+- `REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE`
+
+The `global_mr_url` is actually a host name, not a full URL. It points to a host in a global message router
+cluster. (A 'global' message router cluster is one that's not part of ONAP.)
+
+- `ccsdk.nodes.ExistingTopic`: This type represents a topic that
+already exists. Nodes of this type are placed in a blueprint so
+that other nodes in the blueprint can be set up as publishers or
+subscribers to the topic. The table below shows the properties that a node
+of this type may have.
+
+Property|Type|Required?|Description
+--------|----|---------|----------------------------------------
+fqtn|string|yes|fully-qualified topic name for the topic
+
+#### Interaction with Other Plugins
+When creating a new topic or processing a reference to an existing topic,
+the plugin operates independently of other plugins.
+
+When processing a `ccsdk.relationships.publish_events` relationship or a
+`ccsdk.relationships.subscribe_to_events` relationship, this plugin needs
+to obtain data from and provide data to the source node. Certain conventions are therefore needed for
+passing data between this plugin and the plugins responsible for the source
+nodes in these relationships. In Cloudify, the mechanism for
+sharing data among plugins is the `ctx.instance.runtime_properties` dictionary
+associated with each node.
+
+A given source node may have relationships with several topics. For example, an ONAP DCAE
+analytics module might subscribe to one topic to get input for its processing and
+publish its results to a different topic. When this DMaaP plugin and the plugin for the
+source node exchange information, they need to do in a way that lets them distinguish
+among different feeds. We do this through a simple convention: for each source node
+to topic relationship, the source node plugin will create a property in the source node's
+`runtime_properties` dictionary. The name of the property will be the same as the
+name of the target node of the relationship. For instance, if a node has a
+`publishes_events` relationship with a target node named `topic00`, then the plugin that's
+responsible for managing the source node with create an entry in the source node's
+`runtime_properties` dictionary named `topic00`. This entry itself will be a dictionary.
+
+For both types of relationship, the data exchange dictionary has the following
+properties:
+
+Property|Set by|Description
+--------|------|------------------------------------------------
+location|source node plugin|the DMaaP location for the publisher or subscriber, used to set up routing
+client_role|source node plugin|the AAF client role that's requesting publish or subscribe access to the topic
+topic_url|DMaaP plugin|the URL for accessing the topic to publish or receive events
+
+### Interaction with Consul configuration store
+In addition to storing the results of DMaaP Data Router and DMaaP Message Router provisioning operations in `runtime_properties`,
+the DMaaP plugin also stores these results into the ONAP configuration store, which resides in a
+[Consul key-value store](https://www.consul.io/). This allows DMaaP clients (components that act as publishers, subscribers, or both)
+to retrieve their DMaaP configuration information from Consul, rather than having the plugin that deploys the client directly
+configure the client using data in `runtime_properties`.
+
+The `runtime_properties` for a client must contain a property called `service_component_name`. If this property is not present,
+the plugin will raise a NonRecoverableError and cause the installation to fail.
+
+If `service_component_name` is present, then the plugin will use a Consul key consisting of the value
+of `service_component_name` prepended to the fixed string `:dmaap`. For example, if the `service_component_name`
+is `client123`, the plugin will use `client123:dmaap` as the key for storing DMaaP information into Consul.
+Information for all of the feeds and topics for a client are stored under the same key.
+
+The value stored is a nested JSON object. At the top level of the object are properties representing each topic or feed
+for which the component is a publisher or subscriber. The name of the property is the node name of the target feed or topic.
+The value of the property is another JSON object that corresponds to the dictionary that the plugin created in
+`runtime_properties` corresponding to the target feed or topic. Note that the information in Consul includes
+all of the properties for the feed or topic, those set by the source node plugin as well as those set by the DMaaP plugin.
+
+Examples:
+
+Data Router publisher, target feed `feed00`:
+```
+{
+ "feed00": {
+ "username": "rC9QR51I",
+ "log_url": "https://dmaap.example.com/feedlog/972",
+ "publish_url": "https://dmaap.example.com/publish/972",
+ "location": "loc00",
+ "password": "QOQeUh5KLR",
+ "publisher_id": "972.360gm"
+ }
+}
+```
+
+Data Router subscriber, target feed `feed01`:
+```
+{
+ "feed01": {
+ "username": "drdeliver",
+ "password": "1loveDataR0uter",
+ "location": "loc00",
+ "delivery_url": "https://example.com/whatever",
+ "subscriber_id": "1550"
+ }
+}
+```
+
+Message Router publisher to `topic00`, subscriber to `topic01`. Note how each topic
+appears as a top-level property in the object.
+```
+{
+ "topic00": {
+ "topic_url": "https://dmaap.example.com:3905/events/org.onap.ccsdk.dmaap.FTL2.outboundx",
+ "client_role": "org.onap.ccsdk.member",
+ "location": "loc00",
+ "client_id": "1494621774522"
+ },
+ "topic01": {
+ "topic_url": "https://dmaap.example.com:3905/events/org.onap.ccsdk.dmaap.FTL2.inboundx",
+ "client_role": "org.onap.ccsdk.member",
+ "location": "loc00",
+ "client_id": "1494621778627"
+ }
+}
+```
+
+### Packaging and installing
+The DMaaP plugin is meant to be used as a [Cloudify managed plugin](http://docs.getcloudify.org/3.4.0/plugins/using-plugins/). Managed plugins
+are packaged using [`wagon`](https://github.com/cloudify-cosmo/wagon).
+
+To package this plugin, executing the following command in the top-level directory of this plugin, from a Python environment in which `wagon` has been installed:
+```
+wagon create -s . -r -o /path/to/directory/for/wagon/output
+```
+Once the wagon file is built, it can be uploaded to a Cloudify Manager host using the `cfy plugins upload` command described in the documentation above.
+
+Managed plugins can also be loaded at the time a Cloudify Manager host is installed, via the installation blueprint and inputs file. We expect that this plugin will
+be loaded at Cloudify Manager installation time, and that `cfy plugins upload` will be used only for delivering patches between releases.
+
+### Configuration
+The plugin needs to be configured with certain parameters needed to access the DMaaP Bus Controller. In keeping with the ONAP architecture, this information is
+stored in Consul.
+
+The plugin finds the address and port of the DMaaP Bus Controller using the Consul service discovery facility. The plugin expects the Bus Controller to be
+registered under the name `dmaap_bus_controller`.
+
+Additional parameters come from the `dmaap` key in the Cloudify Manager's Consul configuration, which is stored in the Consul KV store under the key name
+'cloudify_manager'. The table below lists the properties in the configuration:
+
+Property|Type|Required?|Default|Description
+--------|----|---------|-------|--------------------------------
+`username`|string|Yes|(none)|The username for logging into DMaaP Bus Controller
+`password`|string|Yes|(none)|The password for logging into DMaaP Bus Controller
+`owner`|string|Yes|(none)|The name to be used as the owner for entities created by the plugin
+`protocol`|string|No|`https`|The protocol (URL scheme) used to access the DMaaP bus controller (`http` or `https`)
+`path`|string|No|`webapi`|The path to the root of the DMaaP Bus Controller API endpoint
+
+Here is an example of a Cloudify Manager configuration object showing only the `dmaap` key:
+```
+{
+ "dmaap": {
+ "username": "dmaap.client@ccsdkorch.onap.org",
+ "password": "guessmeifyoucan"
+ "owner": "ccsdkorc"
+ },
+
+ (other configuration here)
+
+}
+```
diff --git a/dmaap/consulif/__init__.py b/dmaap/consulif/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/dmaap/consulif/__init__.py
diff --git a/dmaap/consulif/consulif.py b/dmaap/consulif/consulif.py
new file mode 100644
index 0000000..e742895
--- /dev/null
+++ b/dmaap/consulif/consulif.py
@@ -0,0 +1,120 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import consul
+import json
+from urlparse import urlparse
+
+class ConsulHandle(object):
+ '''
+ Provide access to Consul KV store and service discovery
+ '''
+
+ def __init__(self, api_url, user, password, logger):
+ '''
+ Constructor
+ '''
+ u = urlparse(api_url)
+ self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme)
+
+ def get_config(self, key):
+ '''
+ Get configuration information from Consul using the provided key.
+ It should be in JSON form. Convert it to a dictionary
+ '''
+ (index, val) = self.ch.kv.get(key)
+ config = json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate
+ return config
+
+ def get_service(self,service_name):
+ '''
+ Look up the service named service_name in Consul.
+ Return the service address and port.
+ '''
+ (index, val) = self.ch.catalog.service(service_name)
+ if len(val) > 0: # catalog.service returns an empty array if service not found
+ service = val[0] # Could be multiple listings, but we take the first
+ if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0):
+ service_address = service['ServiceAddress'] # Most services should have this
+ else:
+ service_address = service['Address'] # "External" services will have this only
+ service_port = service['ServicePort']
+ else:
+ raise Exception('Could not find service information for "{0}"'.format(service_name))
+
+ return service_address, service_port
+
+ def add_to_entry(self, key, add_name, add_value):
+ '''
+ Find 'key' in consul.
+ Treat its value as a JSON string representing a dict.
+ Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
+ Turn the resulting extended dict into a JSON string.
+ Store the string back into Consul under 'key'.
+ Watch out for conflicting concurrent updates.
+
+ Example:
+ Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
+ add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
+ should result in the value for key 'xyz:dmaap' in consul being updated to
+ '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
+ '''
+
+ while True: # do until update succeeds
+ (index, val) = self.ch.kv.get(key) # index gives version of key retrieved
+
+ if val is None: # no key yet
+ vstring = '{}'
+ mod_index = 0 # Use 0 as the cas index for initial insertion of the key
+ else:
+ vstring = val['Value']
+ mod_index = val['ModifyIndex']
+
+ # Build the updated dict
+ # Exceptions just propagate
+ v = json.loads(vstring)
+ v[add_name] = add_value
+ new_vstring = json.dumps(v)
+
+ updated = self.ch.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false
+ if updated:
+ break
+
+
+ def delete_entry(self,entry_name):
+ '''
+ Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store.
+
+ Note that the kv.delete() operation always returns True,
+ whether there's an entry with key 'entry_name' exists or not. This doesn't seem like
+ a great design, but it means it's safe to try to delete the same entry repeatedly.
+
+ Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and
+ feeds we've stored into the 'component_name:dmaap' entry.
+
+ Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire
+ 'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or
+ subscribes relationship. The first unlink will actually remove the entry, the subsequent ones
+ will harmlessly try to remove it again.
+
+ The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches
+ the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting
+ entry back into Consul. It would be very similar to add_from_entry. When there's nothing left
+ in the entry, then the entire entry would be deleted.
+ '''
+ self.ch.kv.delete(entry_name)
diff --git a/dmaap/dmaap.yaml b/dmaap/dmaap.yaml
new file mode 100644
index 0000000..1c3ff43
--- /dev/null
+++ b/dmaap/dmaap.yaml
@@ -0,0 +1,193 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+
+# Types and relationships for DMaaP data router feeds
+
+tosca_definitions_version: cloudify_dsl_1_3
+
+imports:
+ - http://www.getcloudify.org/spec/cloudify/3.4/types.yaml
+
+plugins:
+ dmaapplugin:
+ executor: 'central_deployment_agent'
+ package_name: cloudifydmaapplugin
+ package_version: 1.2.0
+
+
+node_types:
+
+ # Data Router feed to be created
+ ccsdk.nodes.Feed:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ feed_name:
+ type: string
+ required: false
+ feed_version:
+ type: string
+ required: false
+ feed_description:
+ type: string
+ required: false
+ aspr_classification:
+ type: string
+ required: false
+
+ interfaces:
+ cloudify.interfaces.lifecycle:
+ create:
+ implementation:
+ dmaapplugin.dmaapplugin.dr_lifecycle.create_feed
+ delete:
+ implementation:
+ dmaapplugin.dmaapplugin.dr_lifecycle.delete_feed
+
+ # Existing Data Router feed to be used as target for publishing/subscribing
+ ccsdk.nodes.ExistingFeed:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ feed_id:
+ type: string
+ required: true
+
+ interfaces:
+ cloudify.interfaces.lifecycle:
+ configure:
+ implementation:
+ dmaapplugin.dmaapplugin.dr_lifecycle.get_existing_feed
+
+ # Existing Global Data Router feed (created via Invenio) to be used as target for bridging
+ ccsdk.nodes.ExternalTargetFeed:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ url:
+ type: string
+ required: true
+ username:
+ type: string
+ required: true
+ userpw:
+ type: string
+ required: true
+
+ # Global Data Router feed to be used as a source for bridging
+ # Has no properties
+ ccsdk.nodes.ExternalSourceFeed:
+ derived_from: cloudify.nodes.Root
+
+ # Message Router topic to be created
+ ccsdk.nodes.Topic:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ topic_name:
+ type: string
+ required: false
+ topic_description:
+ type: string
+ required: false
+ txenable:
+ type: boolean
+ required: false
+ replication_case:
+ type: string
+ required: false
+ global_mr_url:
+ type: string
+ required: false
+
+ interfaces:
+ cloudify.interfaces.lifecycle:
+ create:
+ implementation:
+ dmaapplugin.dmaapplugin.mr_lifecycle.create_topic
+ delete:
+ implementation:
+ dmaapplugin.dmaapplugin.mr_lifecycle.delete_topic
+
+ # Existing Message Router topic to be used as target for publishing/subscribing
+ ccsdk.nodes.ExistingTopic:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ fqtn:
+ type: string
+ required: true
+
+ interfaces:
+ cloudify.interfaces.lifecycle:
+ configure:
+ implementation:
+ dmaapplugin.dmaapplugin.mr_lifecycle.get_existing_topic
+
+relationships:
+
+ ccsdk.relationships.publish_files:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.dr_relationships.add_dr_publisher
+ unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_publisher
+
+ ccsdk.relationships.subscribe_to_files:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ establish: dmaapplugin.dmaapplugin.dr_relationships.add_dr_subscriber
+ unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_subscriber
+
+ ccsdk.relationships.publish_events:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_publisher
+ unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client
+
+ ccsdk.relationships.subscribe_to_events:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_subscriber
+ unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client
+
+ ccsdk.relationships.bridges_to:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_dr_bridge
+ unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge
+
+ ccsdk.relationships.bridges_to_external:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_dr_bridge
+ unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge
+
+ ccsdk.relationships.bridges_from_external_to_internal:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_source_dr_bridge
+ unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge
+
diff --git a/dmaap/dmaapcontrollerif/__init__.py b/dmaap/dmaapcontrollerif/__init__.py
new file mode 100644
index 0000000..611169f
--- /dev/null
+++ b/dmaap/dmaapcontrollerif/__init__.py
@@ -0,0 +1 @@
+# DMaaP Bus Controller interface library \ No newline at end of file
diff --git a/dmaap/dmaapcontrollerif/dmaap_requests.py b/dmaap/dmaapcontrollerif/dmaap_requests.py
new file mode 100644
index 0000000..eb6fe1b
--- /dev/null
+++ b/dmaap/dmaapcontrollerif/dmaap_requests.py
@@ -0,0 +1,256 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import requests
+
+### "Constants"
+FEEDS_PATH = '/feeds'
+PUBS_PATH = '/dr_pubs'
+SUBS_PATH = '/dr_subs'
+TOPICS_PATH = '/topics'
+CLIENTS_PATH = '/mr_clients'
+LOCATIONS_PATH = '/dcaeLocations'
+
+class DMaaPControllerHandle(object):
+ '''
+ A simple wrapper class to map DMaaP bus controller API calls into operations supported by the requests module
+ '''
+
+ def __init__(self, api_url, user, password, logger,
+ feeds_path = FEEDS_PATH,
+ pubs_path = PUBS_PATH,
+ subs_path = SUBS_PATH,
+ topics_path = TOPICS_PATH,
+ clients_path = CLIENTS_PATH):
+ '''
+ Constructor
+ '''
+ self.api_url = api_url # URL for the root of the Controller resource tree, no trailing "/"
+ self.auth = (user, password) # user name and password for HTTP basic auth
+ self.logger = logger
+ self.feeds_path = feeds_path
+ self.pubs_path = pubs_path
+ self.subs_path = subs_path
+ self.topics_path = topics_path
+ self.clients_path = clients_path
+
+
+ ### INTERNAL FUNCTIONS ###
+
+ def _make_url(self, path):
+ '''
+ Make a full URL given the path relative to the root
+ '''
+ if not path.startswith('/'):
+ path = '/' + path
+
+ return self.api_url + path
+
+ def _get_resource(self, path):
+ '''
+ Get the DMaaP resource at path, where path is relative to the root.
+ '''
+ url = self._make_url(path)
+ self.logger.info("Querying URL: {0}".format(url))
+ return requests.get(url, auth=self.auth)
+
+ def _create_resource(self, path, resource_content):
+ '''
+ Create a DMaaP resource by POSTing to the resource collection
+ identified by path (relative to root), using resource_content as the body of the post
+ '''
+ url = self._make_url(path)
+ self.logger.info("Posting to URL: {0} with body: {1}".format(url, resource_content))
+ return requests.post(url, auth=self.auth, json=resource_content)
+
+ def _delete_resource(self, path):
+ '''
+ Delete the DMaaP resource at path, where path is relative to the root.
+ '''
+ url = self._make_url(path)
+ self.logger.info("Deleting URL: {0}".format(url))
+ return requests.delete(url, auth=self.auth)
+
+ ### PUBLIC API ###
+
+ # Data Router Feeds
+ def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None):
+ '''
+ Create a DMaaP data router feed with the given feed name
+ and (optionally) feed version, feed description, ASPR classification,
+ and owner
+ '''
+ feed_definition = {'feedName' : name}
+ if version:
+ feed_definition['feedVersion'] = version
+ if description:
+ feed_definition['feedDescription'] = description
+ if aspr_class:
+ feed_definition['asprClassification'] = aspr_class
+ if owner:
+ feed_definition['owner'] = owner
+
+ return self._create_resource(self.feeds_path, feed_definition)
+
+ def get_feed_info(self, feed_id):
+ '''
+ Get the representation of the DMaaP data router feed whose feed id is feed_id.
+ '''
+ return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
+
+ def delete_feed(self, feed_id):
+ '''
+ Delete the DMaaP data router feed whose feed id is feed_id.
+ '''
+ return self._delete_resource("{0}/{1}".format(self.feeds_path, feed_id))
+
+ # Data Router Publishers
+ def add_publisher(self, feed_id, location, username, password, status=None):
+ '''
+ Add a publisher to feed feed_id at location location with user, pass, and status
+ '''
+ publisher_definition = {
+ 'feedId' : feed_id,
+ 'dcaeLocationName' : location,
+ 'username' : username,
+ 'userpwd' : password
+ }
+
+ if status:
+ publisher_definition['status'] = status
+
+ return self._create_resource(self.pubs_path, publisher_definition)
+
+ def get_publisher_info(self, pub_id):
+ '''
+ Get the representation of the DMaaP data router publisher whose publisher id is pub_id
+ '''
+ return self._get_resource("{0}/{1}".format(self.pubs_path, pub_id))
+
+ def delete_publisher(self, pub_id):
+ '''
+ Delete the DMaaP data router publisher whose publisher id is id.
+ '''
+ return self._delete_resource("{0}/{1}".format(self.pubs_path, pub_id))
+
+
+ # Data Router SUbscrihers
+ def add_subscriber(self, feed_id, location, delivery_url, username, password, status=None):
+ '''
+ Add a publisher to feed feed_id at location location with user, pass, and status
+ '''
+ subscriber_definition = {
+ 'feedId' : feed_id,
+ 'dcaeLocationName' : location,
+ 'deliveryURL' : delivery_url,
+ 'username' : username,
+ 'userpwd' : password
+ }
+
+ if status:
+ subscriber_definition['status'] = status
+
+ return self._create_resource(self.subs_path, subscriber_definition)
+
+ def get_subscriber_info(self, sub_id):
+ '''
+ Get the representation of the DMaaP data router subscriber whose subscriber id is sub_id
+ '''
+ return self._get_resource("{0}/{1}".format(self.subs_path, sub_id))
+
+ def delete_subscriber(self, sub_id):
+ '''
+ Delete the DMaaP data router subscriber whose subscriber id is sub_id.
+ '''
+ return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id))
+
+ # Message router topics
+ def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None):
+ '''
+ Create a message router topic with the topic name 'name' and optionally the topic_description
+ 'description', the 'txenable' flag and the topic owner 'owner'.
+ '''
+ topic_definition = {'topicName' : name};
+ if description:
+ topic_definition['topicDescription'] = description
+ if owner:
+ topic_definition['owner'] = owner
+ if txenable != None: # It's a boolean!
+ topic_definition['txenable'] = txenable
+ if replication_case:
+ topic_definition['replicationCase'] = replication_case
+ if global_mr_url:
+ topic_definition['globalMrURL'] = global_mr_url
+
+ return self._create_resource(self.topics_path, topic_definition)
+
+ def get_topic_info(self, fqtn):
+ '''
+ Get information about the topic whose fully-qualified name is 'fqtn'
+ '''
+ return self._get_resource("{0}/{1}".format(self.topics_path, fqtn))
+
+ def delete_topic(self, fqtn):
+ '''
+ Delete the topic whose fully qualified name is 'fqtn'
+ '''
+ return self._delete_resource("{0}/{1}".format(self.topics_path, fqtn))
+
+ # Message route clients (publishers and subscribers
+ def create_client(self, fqtn, location, client_role, actions):
+ '''
+ Creates a client authorized to access the topic with fully-qualified name 'fqtn',
+ from the location 'location', using the AAF client role 'client_role'. The
+ client is authorized to perform actions in the list 'actions'. (Valid
+ values are 'pub', 'sub', and 'view'
+ '''
+ client_definition = {
+ 'fqtn' : fqtn,
+ 'dcaeLocationName' : location,
+ 'clientRole' : client_role,
+ 'action' : actions
+ }
+ return self._create_resource(self.clients_path, client_definition)
+
+ def get_client_info(self, client_id):
+ '''
+ Get client information for the client whose client ID is 'client_id'
+ '''
+ return self._get_resource("{0}/{1}".format(self.clients_path, client_id))
+
+ def delete_client(self, client_id):
+ '''
+ Delete the client whose client ID is 'client_id'
+ '''
+ return self._delete_resource("{0}/{1}".format(self.clients_path, client_id))
+
+ def get_dcae_locations(self, dcae_layer):
+ '''
+ Get the list of location names known to the DMaaP bus controller
+ whose "dcaeLayer" property matches dcae_layer and whose status is "VALID".
+ "dcaeLayer" is "opendcae-central" for central sites.
+ '''
+ # Do these as a separate step so things like 404 get reported precisely
+ locations = self._get_resource(LOCATIONS_PATH)
+ locations.raise_for_status()
+
+ # pull out location names for VALID locations with matching dcae_layer
+ return map(lambda l: l["dcaeLocationName"],
+ filter(lambda i : (i['dcaeLayer'] == dcae_layer and i['status'] == 'VALID'),
+ locations.json()))
+
diff --git a/dmaap/dmaapplugin/__init__.py b/dmaap/dmaapplugin/__init__.py
new file mode 100644
index 0000000..130c0bf
--- /dev/null
+++ b/dmaap/dmaapplugin/__init__.py
@@ -0,0 +1,46 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+## Get parameters for accessing the DMaaP controller
+from consulif.consulif import ConsulHandle
+from cloudify.exceptions import NonRecoverableError
+
+CONSUL_HOST = "127.0.0.1" # Should always be a local consul agent on Cloudify Manager
+CM_SERVICE_NAME = "cloudify_manager" # Name under which CM is registered, used as key to get config
+DBC_SERVICE_NAME= "dmaap_bus_controller" # Name under which the DMaaP bus controller is registered
+
+try:
+ _ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, None)
+ config = _ch.get_config(CM_SERVICE_NAME)
+ DMAAP_USER = config['dmaap']['username']
+ DMAAP_PASS = config['dmaap']['password']
+ DMAAP_OWNER = config['dmaap']['owner']
+ if 'protocol' in config['dmaap']:
+ DMAAP_PROTOCOL = config['dmaap']['protocol']
+ else:
+ DMAAP_PROTOCOL = 'https' # Default to https (service discovery should give us this but doesn't
+ if 'path' in config['dmaap']:
+ DMAAP_PATH = config['dmaap']['path']
+ else:
+ DMAAP_PATH = 'webapi' # SHould come from service discovery but Consul doesn't support it
+
+ service_address, service_port = _ch.get_service(DBC_SERVICE_NAME)
+ DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH)
+
+except Exception as e:
+ raise NonRecoverableError("Error configuring dmaap plugin: {0}".format(e))
diff --git a/dmaap/dmaapplugin/dmaaputils.py b/dmaap/dmaapplugin/dmaaputils.py
new file mode 100644
index 0000000..e043a07
--- /dev/null
+++ b/dmaap/dmaapplugin/dmaaputils.py
@@ -0,0 +1,28 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+# Utility functions
+
+import string
+import random
+
+def random_string(n):
+ '''
+ Create a random alphanumeric string, n characters long.
+ '''
+ return ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for x in range(n))
diff --git a/dmaap/dmaapplugin/dr_bridge.py b/dmaap/dmaapplugin/dr_bridge.py
new file mode 100644
index 0000000..4e0df4d
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_bridge.py
@@ -0,0 +1,198 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS
+from dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Set up a subscriber to a source feed
+def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw):
+ # Add subscriber to source feed
+ add_sub = dmc.add_subscriber(source_feed_id, loc, delivery_url, username, userpw)
+ add_sub.raise_for_status()
+ return add_sub.json()
+
+# Set up a publisher to a target feed
+def _set_up_publisher(dmc, target_feed_id, loc):
+ username = random_string(8)
+ userpw = random_string(10)
+ add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw)
+ add_pub.raise_for_status()
+ pub_info = add_pub.json()
+ return pub_info["pubId"], username, userpw
+
+# Get a central location to use when creating a publisher or subscriber
+def _get_central_location(dmc):
+ locations = dmc.get_dcae_locations('opendcae-central')
+ if len(locations) < 1:
+ raise Exception('No central location found for setting up DR bridging')
+ return locations[0] # We take the first one. Typically there will be two central locations
+
+
+# Set up a "bridge" between two feeds internal to DCAE
+# A source feed "bridges_to" a target feed, meaning that anything published to
+# the source feed will be delivered to subscribers to the target feed (as well as
+# to subscribers of the source feed).
+#
+# The bridge is established by first adding a publisher to the target feed. The result of doing this
+# is a publish URL and a set of publication credentials.
+#The publish URL and publication credentials are used to set up a subscriber to the source feed.
+#I.e., we tell the source feed to deliver to an endpoint which is actually a publish
+# endpoint for the target feed.
+@operation
+def create_dr_bridge(**kwargs):
+
+ try:
+
+ # Get source and target feed ids
+ if 'feed_id' in ctx.target.instance.runtime_properties:
+ target_feed_id = ctx.target.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Target feed has no feed_id property')
+ if 'feed_id' in ctx.source.instance.runtime_properties:
+ source_feed_id = ctx.source.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Source feed has no feed_id property')
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ # Get a location to use when creating a publisher or subscriber--a central location seems reasonable
+ loc = _get_central_location(dmc)
+
+ ctx.logger.info('Creating bridge from feed {0} to feed {1} using location {2}'.format(source_feed_id, target_feed_id, loc))
+
+ # Add publisher to target feed
+ publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
+ ctx.logger.info("Added publisher id {0} to target feed {1} with user {2}".format(publisher_id, target_feed_id, username))
+
+ # Add subscriber to source feed
+ delivery_url = ctx.target.instance.runtime_properties['publish_url']
+ subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw)
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, delivery_url))
+
+ # Save the publisher and subscriber IDs on the source node, indexed by the target node id
+ ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "subscriber_id": subscriber_id}
+
+ except Exception as e:
+ ctx.logger.error("Error creating bridge: {0}".format(e))
+ raise NonRecoverableError(e)
+
+# Set up a bridge from an internal DCAE feed to a feed in an external Data Router system
+# The target feed needs to be provisioned in the external Data Router system. A publisher
+# to that feed must also be set up in the external Data Router system. The publish URL,
+# username, and password need to be captured in a target node of type dcae.nodes.ExternalTargetFeed.
+# The bridge is established by setting up a subscriber to the internal DCAE source feed using the
+# external feed publisher parameters as delivery parameters for the subscriber.
+@operation
+def create_external_dr_bridge(**kwargs):
+ try:
+
+ # Make sure target feed has full set of properties
+ if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties:
+ url = ctx.target.node.properties['url']
+ username = ctx.target.node.properties['username']
+ userpw = ctx.target.node.properties['userpw']
+ else:
+ raise Exception ("Target feed missing url, username, and/or user pw")
+
+ # Make sure source feed has a feed ID
+ if 'feed_id' in ctx.source.instance.runtime_properties:
+ source_feed_id = ctx.source.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Source feed has no feed_id property')
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ # Get a central location to use when creating subscriber
+ loc = _get_central_location(dmc)
+
+ ctx.logger.info('Creating external bridge from feed {0} to external url {1} using location {2}'.format(source_feed_id, url, loc))
+
+ # Create subscription to source feed using properties of the external target feed
+ subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, url, username, userpw)
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, url))
+
+ # Save the subscriber ID on the source node, indexed by the target node id
+ ctx.source.instance.runtime_properties[ctx.target.node.id] = {"subscriber_id": subscriber_id}
+
+ except Exception as e:
+ ctx.logger.error("Error creating external bridge: {0}".format(e))
+ raise NonRecoverableError(e)
+
+# Set up a bridge from a feed in an external Data Router system to an internal DCAE feed.
+# The bridge is established by creating a publisher on the internal DCAE feed. Then a subscription
+# to the external feed is created through manual provisioning in the external Data Router system, using
+# the publish URL and the publisher username and password for the internal feed as the delivery parameters
+# for the external subscription.
+# In order to obtain the publish URL, publisher username, and password, a blueprint using this sort of
+# bridge will typically have an output that exposes the runtime_property set on the source node in this operation.
+@operation
+def create_external_source_dr_bridge(**kwargs):
+ try:
+ # Get target feed id
+ if 'feed_id' in ctx.target.instance.runtime_properties:
+ target_feed_id = ctx.target.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Target feed has no feed_id property')
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ # Get a central location to use when creating a publisher
+ loc = _get_central_location(dmc)
+
+ # Create a publisher on the target feed
+ publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
+
+ # Save the publisher info on the source node, indexed by the target node
+ ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "url": ctx.target.instance.runtime_properties["publish_url"], "username": username, "userpw": userpw}
+
+ except Exception as e:
+ ctx.logger.error("Error creating external source bridge: {0}".format(e))
+
+# Remove the bridge between the relationship source and target.
+# For a bridge between 2 internal feeds, deletes the subscriber on the source feed and the publisher on the target feed.
+# For a bridge to an external target feed, deletes the subscriber on the source feed.
+# For a bridge from an external source feed, deletes the publisher on the target feed.
+@operation
+def remove_dr_bridge(**kwargs):
+ try:
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ if ctx.target.node.id in ctx.source.instance.runtime_properties:
+
+ if 'subscriber_id' in ctx.source.instance.runtime_properties[ctx.target.node.id]:
+ # Delete the subscription for this bridge
+ ctx.logger.info("Removing bridge -- deleting subscriber {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id']))
+ dmc.delete_subscriber(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id'])
+
+ if 'publisher_id' in ctx.source.instance.runtime_properties:
+ # Delete the publisher for this bridge
+ ctx.logger.info("Removing bridge -- deleting publisher {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id']))
+ dmc.delete_publisher(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id'])
+
+ ctx.logger.info("Remove bridge from {0} to {1}".format(ctx.source.node.id, ctx.target.node.id))
+
+ except Exception as e:
+ ctx.logger.error("Error removing bridge: {0}".format(e))
+ # Let the uninstall workflow proceed--don't throw a NonRecoverableError
diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py
new file mode 100644
index 0000000..45f8674
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_lifecycle.py
@@ -0,0 +1,121 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
+from dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Lifecycle operations for DMaaP Data Router feeds
+
+@operation
+def create_feed(**kwargs):
+ '''
+ Create a new data router feed
+ Expects "feed_name" to be set in node properties
+ Allows "feed_version", "feed_description", and "aspr_classification" as optional properties
+ (Sets default values if not provided )
+ Sets instance runtime properties:
+ - "feed_id"
+ - "publish_url"
+ - "log_url"
+
+ '''
+ try:
+ # Make sure there's a feed_name
+ if "feed_name" in ctx.node.properties.keys():
+ feed_name = ctx.node.properties["feed_name"]
+ else:
+ feed_name = random_string(12)
+
+ # Set defaults/placeholders for the optional properties for the feed
+ if "feed_version" in ctx.node.properties.keys():
+ feed_version = ctx.node.properties["feed_version"]
+ else:
+ feed_version = "0.0"
+ if "feed_description" in ctx.node.properties.keys():
+ feed_description = ctx.node.properties["feed_description"]
+ else:
+ feed_description = "No description provided"
+ if "aspr_classification" in ctx.node.properties.keys():
+ aspr_classification = ctx.node.properties["aspr_classification"]
+ else:
+ aspr_classification = "unclassified"
+
+ # Make the request to the controller
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to create feed name {0}".format(feed_name))
+ f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER)
+ f.raise_for_status()
+
+ # Capture important properties from the result
+ feed = f.json()
+ ctx.instance.runtime_properties["feed_id"] = feed["feedId"]
+ ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
+ ctx.instance.runtime_properties["log_url"] = feed["logURL"]
+ ctx.logger.info("Created feed name {0} with feed id {1}".format(feed_name, feed["feedId"]))
+
+ except Exception as e:
+ ctx.logger.error("Error creating feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def get_existing_feed(**kwargs):
+ '''
+ Find information for an existing data router feed
+ Expects "feed_id" to be set in node properties -- uniquely identifies the feed
+ Sets instance runtime properties:
+ - "feed_id"
+ - "publish_url"
+ - "log_url"
+ '''
+ try:
+ # Make the lookup request to the controller
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ f = dmc.get_feed_info(ctx.node.properties["feed_id"])
+ f.raise_for_status()
+
+ # Capture important properties from the result
+ feed = f.json()
+ ctx.instance.runtime_properties["feed_id"] = ctx.node.properties["feed_id"] # Just to be consistent with newly-created node, above
+ ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
+ ctx.instance.runtime_properties["log_url"] = feed["logURL"]
+ ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"]))
+
+ except Exception as e:
+ ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def delete_feed(**kwargs):
+ '''
+ Delete a feed
+ Expects "feed_id" to be set on the instance's runtime properties
+ '''
+ try:
+ # Make the lookup request to the controllerid=ctx.node.properties["feed_id"]
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ f = dmc.delete_feed(ctx.instance.runtime_properties["feed_id"])
+ f.raise_for_status()
+ ctx.logger.info("Deleting feed id {0}".format(ctx.instance.runtime_properties["feed_id"]))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting feed id {id}: {er}".format(id=ctx.instance.runtime_properties["feed_id"],er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py
new file mode 100644
index 0000000..8796354
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_relationships.py
@@ -0,0 +1,211 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST
+from dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Lifecycle operations for DMaaP Data Router
+# publish and subscribe relationships
+
+@operation
+def add_dr_publisher(**kwargs):
+ '''
+ Sets up the source of the publishes_relationship as a publisher to the feed that
+ is the target of the relationship
+ Assumes target (the feed) has the following runtime properties set
+ - feed_id
+ - log_url
+ - publish_url
+ Assumes source (the publisher) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing one property:
+ - location (the dcaeLocationName to pass when adding the publisher to the feed)
+ Generates a user name and password that the publisher will need to use when publishing
+ Adds the following properties to the dictionary above:
+ - publish_url
+ - log_url
+ - username
+ - password
+ '''
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ target_feed = ctx.target.node.id
+ ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+ # Set up the parameters for the add_publisher request to the DMaaP bus controller
+ feed_id = ctx.target.instance.runtime_properties["feed_id"]
+ location = ctx.source.instance.runtime_properties[target_feed]["location"]
+ username = random_string(8)
+ password = random_string(10)
+
+ # Make the request to add the publisher to the feed
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ add_pub = dmc.add_publisher(feed_id, location, username, password)
+ add_pub.raise_for_status()
+ publisher_info = add_pub.json()
+ publisher_id = publisher_info["pubId"]
+ ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password))
+
+ # Set runtime properties on the source
+ ctx.source.instance.runtime_properties[target_feed] = {
+ "publisher_id" : publisher_id,
+ "location" : location,
+ "publish_url" : ctx.target.instance.runtime_properties["publish_url"],
+ "log_url" : ctx.target.instance.runtime_properties["log_url"],
+ "username" : username,
+ "password" : password
+ }
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed])
+
+ except Exception as e:
+ ctx.logger.error("Error adding publisher to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_publisher(**kwargs):
+ '''
+ Deletes publisher (the source of the publishes_files relationship)
+ from the feed (the target of the relationship).
+ Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties,
+ when the publisher was added to the feed.
+ '''
+
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ # Get the publisher id
+ target_feed = ctx.target.node.id
+ publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"]
+ ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id, target_feed))
+
+ # Make the request
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ del_result = dmc.delete_publisher(publisher_id)
+ del_result.raise_for_status()
+
+ ctx.logger.info("Deleted publisher {0}".format(publisher_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting publisher: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
+
+
+@operation
+def add_dr_subscriber(**kwargs):
+ '''
+ Sets up the source of the subscribes_to_files relationship as a subscriber to the
+ feed that is the target of the relationship.
+ Assumes target (the feed) has the following runtime property set
+ - feed_id
+ Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing the following properties:
+ - location (the dcaeLocationName to pass when adding the publisher to the feed)
+ - delivery_url (the URL to which data router will deliver files)
+ - username (the username data router will use when delivering files)
+ - password (the password data router will use when delivering files)
+ Adds a property to the dictionary above:
+ - subscriber_id (used to delete the subscriber in the uninstall workflow
+ '''
+ try:
+ target_feed = ctx.target.node.id
+ ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+ # Get the parameters for the call
+ feed_id = ctx.target.instance.runtime_properties["feed_id"]
+ location = ctx.source.instance.runtime_properties[target_feed]["location"]
+ delivery_url = ctx.source.instance.runtime_properties[target_feed]["delivery_url"]
+ username = ctx.source.instance.runtime_properties[target_feed]["username"]
+ password = ctx.source.instance.runtime_properties[target_feed]["password"]
+
+ # Make the request to add the subscriber to the feed
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password)
+ add_sub.raise_for_status()
+ subscriber_info = add_sub.json()
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location))
+
+ # Add subscriber_id to the runtime properties
+ # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id
+ ctx.source.instance.runtime_properties[target_feed] = {
+ "subscriber_id": subscriber_id,
+ "location" : location,
+ "delivery_url" : delivery_url,
+ "username" : username,
+ "password" : password
+ }
+ ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed]))
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed])
+
+ except Exception as e:
+ ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_subscriber(**kwargs):
+ '''
+ Deletes subscriber (the source of the subscribes_to_files relationship)
+ from the feed (the target of the relationship).
+ Assumes that the source node's runtime properties dictionary for the target feed
+ includes 'subscriber_id', set when the publisher was added to the feed.
+ '''
+ try:
+ # Get the subscriber id
+ target_feed = ctx.target.node.id
+ subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"]
+ ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed))
+
+ # Make the request
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ del_result = dmc.delete_subscriber(subscriber_id)
+ del_result.raise_for_status()
+
+ ctx.logger.info("Deleted subscriber {0}".format(subscriber_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting subscriber: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py
new file mode 100644
index 0000000..16ad953
--- /dev/null
+++ b/dmaap/dmaapplugin/mr_lifecycle.py
@@ -0,0 +1,121 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
+from dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Lifecycle operations for DMaaP Message Router topics
+@operation
+def create_topic(**kwargs):
+ '''
+ Creates a message router topic.
+ Allows 'topic_name', 'topic_description', 'txenable', 'replication_case',
+ and 'global_mr_url' as optional node properties. If 'topic_name' is not set,
+ generates a random one.
+ Sets 'fqtn' in the instance runtime_properties.
+ Note that 'txenable' is a Message Router flag indicating whether transactions
+ are enabled on the topic.
+ '''
+ try:
+ # Make sure there's a topic_name
+ if "topic_name" in ctx.node.properties:
+ topic_name = ctx.node.properties["topic_name"]
+ else:
+ topic_name = random_string(12)
+
+ # Make sure there's a topic description
+ if "topic_description" in ctx.node.properties:
+ topic_description = ctx.node.properties["topic_description"]
+ else:
+ topic_description = "No description provided"
+
+ # ..and the truly optional setting
+ if "txenable" in ctx.node.properties:
+ txenable = ctx.node.properties["txenable"]
+ else:
+ txenable= False
+
+ if "replication_case" in ctx.node.properties:
+ replication_case = ctx.node.properties["replication_case"]
+ else:
+ replication_case = None
+
+ if "global_mr_url" in ctx.node.properties:
+ global_mr_url = ctx.node.properties["global_mr_url"]
+ else:
+ global_mr_url = None
+
+
+ # Make the request to the controller
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
+ t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url)
+ t.raise_for_status()
+
+ # Capture important properties from the result
+ topic = t.json()
+ ctx.instance.runtime_properties["fqtn"] = topic["fqtn"]
+
+ except Exception as e:
+ ctx.logger.error("Error creating topic: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def get_existing_topic(**kwargs):
+ '''
+ Get data for an existing feed.
+ Expects 'fqtn' as a node property.
+ Copies this property to 'fqtn' in runtime properties for consistency
+ with a newly-created topic.
+ While there's no real need to make a call to the DMaaP bus controller,
+ we do so just to make sure the fqtn is known to the controller, so we
+ don't run into problems when we try to add a publisher or subscriber later.
+ '''
+ try:
+ fqtn = ctx.node.properties["fqtn"]
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn))
+ t = dmc.get_topic_info(fqtn)
+ t.raise_for_status()
+
+ ctx.instance.runtime_properties["fqtn"] = ctx.node.properties["fqtn"]
+
+ except Exception as e:
+ ctx.logger.error("Error getting existing topic: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def delete_topic(**kwargs):
+ '''
+ Delete the topic. Expects the instance runtime property "fqtn" to have been
+ set when the topic was created.
+ '''
+ try:
+ fqtn = ctx.instance.runtime_properties["fqtn"]
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to delete topic {0}".format(fqtn))
+ t = dmc.delete_topic(fqtn)
+ t.raise_for_status()
+
+ except Exception as e:
+ ctx.logger.error("Error getting existing topic: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/mr_relationships.py b/dmaap/dmaapplugin/mr_relationships.py
new file mode 100644
index 0000000..ff92d67
--- /dev/null
+++ b/dmaap/dmaapplugin/mr_relationships.py
@@ -0,0 +1,119 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Message router relationship operations
+
+def _add_mr_client(ctype, actions):
+ '''
+ Adds the node represented by 'source' as a client (publisher or subscriber) to
+ to topic represented by the 'target' node. The list of actions in 'actions'
+ determines whether the client is a subscriber or a publisher.
+
+ Assumes target (the topic) has the following runtime property set
+ - fqtn
+ Assumes source (the client) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing the following properties:
+ - location (the dcaeLocationName to pass when adding the client to the topic)
+ - client_role (the AAF client role under which the client will access the topic)
+ Adds two properties to the dictionary above:
+ - topic_url (the URL that the client can use to access the topic)
+ - client_id (used to delete the client in the uninstall workflow)
+ '''
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ target_topic = ctx.target.node.id # Key for the source's dictionary with topic-related info
+ fqtn = ctx.target.instance.runtime_properties["fqtn"]
+ ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn))
+
+ # Get the parameters needed for adding the client
+ location = ctx.source.instance.runtime_properties[target_topic]["location"]
+ client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"]
+
+ # Make the request to add the client to the topic
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ c = dmc.create_client(fqtn, location, client_role, actions)
+ c.raise_for_status()
+ client_info = c.json()
+ client_id = client_info["mrClientId"]
+ topic_url = client_info["topicURL"]
+
+ # Update source's runtime properties
+ #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url
+ #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id
+ ctx.source.instance.runtime_properties[target_topic] = {
+ "topic_url" : topic_url,
+ "client_id" : client_id,
+ "location" : location,
+ "client_role" : client_role
+ }
+
+ ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location))
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic])
+
+ except Exception as e:
+ ctx.logger.error("Error adding client to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def add_mr_publisher(**kwargs):
+ _add_mr_client("publisher", ["view", "pub"])
+
+@operation
+def add_mr_subscriber(**kwargs):
+ _add_mr_client("subscriber", ["view", "sub"])
+
+@operation
+def delete_mr_client(**kwargs):
+ '''
+ Delete the client (publisher or subscriber).
+ Expect property 'client_id' to have been set in the instance's runtime_properties
+ when the client was created.
+ '''
+ try:
+ target_topic = ctx.target.node.id
+ client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"]
+ ctx.logger.info("Attempting to delete client {0} ".format(client_id))
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ c = dmc.delete_client(client_id)
+ c.raise_for_status()
+
+ ctx.logger.info("Deleted client {0}".format(client_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting MR client: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
+
diff --git a/dmaap/requirements.txt b/dmaap/requirements.txt
new file mode 100644
index 0000000..ffdb97f
--- /dev/null
+++ b/dmaap/requirements.txt
@@ -0,0 +1 @@
+python-consul==0.7.0
diff --git a/dmaap/setup.py b/dmaap/setup.py
new file mode 100644
index 0000000..0d23668
--- /dev/null
+++ b/dmaap/setup.py
@@ -0,0 +1,16 @@
+from setuptools import setup, find_packages
+
+setup(
+ name = "cloudifydmaapplugin",
+ version = "1.2.0",
+ packages=find_packages(),
+ author = "AT&T",
+ description = ("Cloudify plugin for creating DMaaP feeds and topics, and setting up publishers and subscribers."),
+ license = "",
+ keywords = "",
+ url = "",
+ zip_safe=False,
+ install_requires = [
+ "python-consul==0.7.0"
+ ]
+)
diff --git a/dmaap/tests/test_plugin.py b/dmaap/tests/test_plugin.py
new file mode 100644
index 0000000..b9ebedc
--- /dev/null
+++ b/dmaap/tests/test_plugin.py
@@ -0,0 +1,26 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import pytest
+import requests
+from cloudify.mocks import MockCloudifyContext
+from cloudify.state import current_ctx
+from cloudify.exceptions import NonRecoverableError
+
+def test_noop():
+ pass
diff --git a/dmaap/tox.ini b/dmaap/tox.ini
new file mode 100644
index 0000000..9498c82
--- /dev/null
+++ b/dmaap/tox.ini
@@ -0,0 +1,26 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+[tox]
+envlist = py27
+[testenv]
+deps=
+ pytest
+ cloudify==3.4
+ requests
+commands=pytest