summaryrefslogtreecommitdiffstats
path: root/dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'dmaap')
-rw-r--r--dmaap/.gitignore4
-rw-r--r--dmaap/LICENSE.txt17
-rw-r--r--dmaap/README.md324
-rw-r--r--dmaap/consulif/__init__.py0
-rw-r--r--dmaap/consulif/consulif.py125
-rw-r--r--dmaap/dmaap.yaml202
-rw-r--r--dmaap/dmaapcontrollerif/__init__.py1
-rw-r--r--dmaap/dmaapcontrollerif/dmaap_requests.py310
-rw-r--r--dmaap/dmaapplugin/__init__.py82
-rw-r--r--dmaap/dmaapplugin/dmaaputils.py29
-rw-r--r--dmaap/dmaapplugin/dr_bridge.py199
-rw-r--r--dmaap/dmaapplugin/dr_lifecycle.py153
-rw-r--r--dmaap/dmaapplugin/dr_relationships.py219
-rw-r--r--dmaap/dmaapplugin/mr_lifecycle.py143
-rw-r--r--dmaap/dmaapplugin/mr_relationships.py119
-rw-r--r--dmaap/pom.xml327
-rw-r--r--dmaap/requirements.txt3
-rw-r--r--dmaap/setup.py36
-rw-r--r--dmaap/tests/conftest.py88
-rw-r--r--dmaap/tests/test_consulif.py72
-rw-r--r--dmaap/tests/test_dmaapcontrollerif.py113
-rw-r--r--dmaap/tests/test_dr_lifecycle.py65
-rw-r--r--dmaap/tests/test_mr_lifecycle.py59
-rw-r--r--dmaap/tests/test_plugin.py26
-rw-r--r--dmaap/tests/test_utils.py26
-rw-r--r--dmaap/tox.ini36
26 files changed, 2778 insertions, 0 deletions
diff --git a/dmaap/.gitignore b/dmaap/.gitignore
new file mode 100644
index 0000000..9ef55c9
--- /dev/null
+++ b/dmaap/.gitignore
@@ -0,0 +1,4 @@
+# local additions to plugins .gitignore
+wheels
+cdap.zip
+docker.zip
diff --git a/dmaap/LICENSE.txt b/dmaap/LICENSE.txt
new file mode 100644
index 0000000..86c0033
--- /dev/null
+++ b/dmaap/LICENSE.txt
@@ -0,0 +1,17 @@
+============LICENSE_START=======================================================
+org.onap.dcaegen2
+================================================================================
+Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
diff --git a/dmaap/README.md b/dmaap/README.md
new file mode 100644
index 0000000..55ac621
--- /dev/null
+++ b/dmaap/README.md
@@ -0,0 +1,324 @@
+## Cloudify DMaaP Plugin
+Cloudify plugin for creating and managing DMaaP Data Router feeds and subscriptions and
+DMaaP Message Router topics. The plugin uses the DMaaP Bus Controller API.
+
+### Plugin Support for DMaaP Data Router
+#### Plugin Types for DMaaP Data Router
+The Cloudify type definitions for DMaaP Data Router nodes and relationships
+are defined in [`dmaap.yaml`](./dmaap.yaml).
+
+There are four node types for DMaaP Data Router:
+
+- `dcaegen2.nodes.Feed`: This type represents a feed that does not yet
+exist and that should be created when the install workflow is
+run against a blueprint that contains a node of this type.
+
+Property|Type|Required?|Description |
+--------|----|---------|---------------------------------------
+feed_name|string|no|a name that identifies the feed (plugin will generate if absent)
+feed_version|string|no|version number for the feed (feed_name + feed_version uniquely identify the feed in DR)
+feed_description|string|no|human-readable description of the feed
+aspr_classification|string|no|AT&T ASPR classification of the feed
+
+
+- `dcaegen2.nodes.ExistingFeed`: This type represents a feed that
+already exists. Nodes of this type are placed in a blueprint so
+that other nodes in the blueprint can be set up as publishers or
+subscribers to the feed. The table below shows the properties that a node
+of this type may have.
+
+Property|Type|Required?|Description
+--------|----|---------|----------------------------------------
+feed_id|string|no|Feed identifier assigned by DMaaP when the feed was created
+feed_name|string|no|a name that identifies the feed
+
+- `dcaegen2.nodes.ExternalTargetFeed`: This type represents a feed created in an external DMaaP
+environment (i.e., an environment that the plugin cannot access to make provisioning requests, such as
+a shared corporate system). Nodes of this type are placed in a blueprint so that other feed nodes of
+type `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed` can be set up to "bridge" to external feeds by
+publishing data to the external feeds. The table below shows the properties that a node of this type
+may have.
+
+Property|Type|Required?|Description
+--------|----|---------|----------------------------------------
+url|string|yes|The publish URL of the external feed.
+username|string|yes|The username to be used when delivering to the external feed
+userpw|string|yes|The password to be used when delivering to the external feed
+
+_Note: These properties are usually obtained by manually creating a feed in the external
+DMaaP DR system and then creating a publisher for that feed._
+
+- `dcaegen2.nodes.ExternalSourceFeed`: This type represents a feed created in an external DMaaP
+environment (i.e., an environment that the plugin cannot access to makes provisioning requests, such as
+a shared corporate system). Nodes of this type are place in a blueprint so that they can be set up to
+"bridge" to other feed nodes of type `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed`. This type
+has no node properties, but when a bridge is set up, the url, username, and password are attached to the
+node as runtime_properties, using the name of the target feed node as the top-level key.
+
+There are five relationship types for DMaaP Data Router:
+
+- `dcaegen2.relationships.publish_files`, used to
+indicate that the relationship's source node sends is a publisher to the
+Data Router feed represented by the relationship's target node.
+- `dcaegen2.relationships.subscribe_to_files`, used to
+indicate that the relationship's source node is a subscriber to the
+Data Router feed represented by the relationship's target node.
+- `dcaegen2.relationships.bridges_to`, used to indicate that the relationship's source
+node (a `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed`) should be set up
+to forward data ("bridge") to the relationship's target feed (another `dcaegen2.nodes.Feed` or
+`dcaegen2.nodes.ExistingFeed`).
+- `dcaegen2.relationships.bridges_to_external`, used to indicate that the relationship's source
+node (a `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed`) should be set up
+to forward data ("bridge") to the relationship's target node (a feed in an external DMaaP system,
+represented by a `dcaegen2.nodes.ExternalTargetFeed` node).
+- `dcaegen2.relationships.bridges_from_external_to_internal`, used to indicate the the relationship's source
+node (a feed in an external DMaaP system, represented by a `dcaegen2.nodes.ExternalSourceFeed` node) should be set up to forward date ("bridge")
+to the relationship's target node (an internal ONAP feed, represented by a `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed` node).
+
+The plugin code implements the lifecycle operations needed to create and
+delete feeds and to add and remove publishers and subscribers. It also implements
+the operations needed to set up bridging between feeds.
+
+#### Interaction with Other Plugins
+When creating a new feed or processing a reference to an existing feed,
+the plugin operates independently of other plugins.
+
+When processing a `dcaegen2.relationships.publish_files` relationship or a
+`dcaegen2.relationships.subscribe_to_files` relationship, this plugin needs
+to obtain data from the source node and, in the case of `publish_files`, provide
+data to the source node. Certain conventions are therefore needed for
+passing data between this plugin and the plugins responsible for the source
+nodes in these relationships. In Cloudify, the mechanism for
+sharing data among plugins is the `ctx.instance.runtime_properties` dictionary
+associated with each node.
+
+A given source node may have relationships with several feeds. For example, an ONAP DCAE
+data collector might publish two different types of data to two different feeds. An ONAP DCAE
+analytics module might subscribe to one feed to get input for its processing and
+publish its results to a different feed. When this DMaaP plugin and the plugin for the
+source node exchange information, they need to do in a way that lets them distinguish
+among different feeds. We do this through a simple convention: for each source node
+to feed relationship, the source node plugin will create a property in the source node's
+`runtime_properties` dictionary. The name of the property will be the same as the
+name of the target node of the relationship. For instance, if a node has a
+`publishes_files` relationship with a target node named `feed00`, then the plugin that's
+responsible for managing the source node with create an entry in the source node's
+`runtime_properties` dictionary named `feed00`. This entry itself will be a dictionary.
+
+The content of this data exchange dictionary depends on whether the source node is a
+publisher (i.e., the relationship is `publish_files`) or a subscriber (i.e., the
+relationship is `subscribe_to_files`).
+
+For the `publish_files` relationship, the data exchange dictionary has the following
+properties:
+
+Property|Set by|Description
+--------|------|------------------------------------------------
+location|source node plugin|the DMaaP location for the publisher, used to set up routing
+publish_url|DMaaP plugin|the URL to which the publisher makes Data Router publish requests
+log_url|DMaaP plugin|the URL from which log data for the feed can be obtained
+username|DMaaP plugin|the username (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router
+password|DMaaP plugin|the password (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router
+
+For the `subscribe_to_files` relationship, the data exchange dictionary has the following
+properties:
+
+Property|Set by|Description
+--------|------|------------------------------------------------
+location|source node plugin|the DMaaP location for the subscriber, used to set up routing
+delivery_url|source node plugin|the URL to which the Data Router should deliver files
+username|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering files
+password|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering file
+
+### Plugin Support for DMaaP Message Router
+#### Plugin Types for DMaaP Message Router
+The Cloudify type definitions for DMaaP Message Router nodes and relationships
+are defined in [`dmaap.yaml`](./dmaap.yaml).
+
+There are two node types for DMaaP Message Router:
+
+- `dcaegen2.nodes.Topic`: This type represents a topic that does not yet
+exist and that should be created when the install workflow is
+run against a blueprint that contains a node of this type.
+
+Property|Type|Required?|Description
+--------|----|---------|---------------------------------------
+topic_name|string|no|a name that uniquely identifies the feed (plugin will generate if absent or is empty string or contain only whitespace)
+topic_description|string|no|human-readable description of the feed
+txenable|boolean|no|flag indicating whether transactions are enabled for this topic
+replication_case|string|no|type of replication required for the topic (defaults to no replication)
+global_mr_url|string|no|Global MR host name for replication to a global MR instance
+
+Note: In order to set up topics, a user should be familiar with message router and how it is configured,
+and this README is not the place to explain the details of message router. Here are a couple of pieces of
+information that might be helpful.
+Currently, the allowed values for `replication_case` are:
+
+- `REPLICATION_NONE`
+- `REPLICATION_EDGE_TO_CENTRAL`
+- `REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL`
+- `REPLICATION_CENTRAL_TO_EDGE`
+- `REPLICATION_CENTRAL_TO_GLOBAL`
+- `REPLICATION_GLOBAL_TO_CENTRAL`
+- `REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE`
+
+The `global_mr_url` is actually a host name, not a full URL. It points to a host in a global message router
+cluster. (A 'global' message router cluster is one that's not part of ONAP.)
+
+- `dcaegen2.nodes.ExistingTopic`: This type represents a topic that
+already exists. Nodes of this type are placed in a blueprint so
+that other nodes in the blueprint can be set up as publishers or
+subscribers to the topic. The table below shows the properties that a node
+of this type may have.
+
+Property|Type|Required?|Description
+--------|----|---------|----------------------------------------
+fqtn|string|no|fully-qualified topic name for the topic
+topic_name|string|no|a name that identifies the topic
+
+#### Interaction with Other Plugins
+When creating a new topic or processing a reference to an existing topic,
+the plugin operates independently of other plugins.
+
+When processing a `dcaegen2.relationships.publish_events` relationship or a
+`dcaegen2.relationships.subscribe_to_events` relationship, this plugin needs
+to obtain data from and provide data to the source node. Certain conventions are therefore needed for
+passing data between this plugin and the plugins responsible for the source
+nodes in these relationships. In Cloudify, the mechanism for
+sharing data among plugins is the `ctx.instance.runtime_properties` dictionary
+associated with each node.
+
+A given source node may have relationships with several topics. For example, an ONAP DCAE
+analytics module might subscribe to one topic to get input for its processing and
+publish its results to a different topic. When this DMaaP plugin and the plugin for the
+source node exchange information, they need to do in a way that lets them distinguish
+among different feeds. We do this through a simple convention: for each source node
+to topic relationship, the source node plugin will create a property in the source node's
+`runtime_properties` dictionary. The name of the property will be the same as the
+name of the target node of the relationship. For instance, if a node has a
+`publishes_events` relationship with a target node named `topic00`, then the plugin that's
+responsible for managing the source node with create an entry in the source node's
+`runtime_properties` dictionary named `topic00`. This entry itself will be a dictionary.
+
+For both types of relationship, the data exchange dictionary has the following
+properties:
+
+Property|Set by|Description
+--------|------|------------------------------------------------
+location|source node plugin|the DMaaP location for the publisher or subscriber, used to set up routing
+client_role|source node plugin|the AAF client role that's requesting publish or subscribe access to the topic
+topic_url|DMaaP plugin|the URL for accessing the topic to publish or receive events
+
+### Interaction with Consul configuration store
+In addition to storing the results of DMaaP Data Router and DMaaP Message Router provisioning operations in `runtime_properties`,
+the DMaaP plugin also stores these results into the ONAP configuration store, which resides in a
+[Consul key-value store](https://www.consul.io/). This allows DMaaP clients (components that act as publishers, subscribers, or both)
+to retrieve their DMaaP configuration information from Consul, rather than having the plugin that deploys the client directly
+configure the client using data in `runtime_properties`.
+
+The `runtime_properties` for a client must contain a property called `service_component_name`. If this property is not present,
+the plugin will raise a NonRecoverableError and cause the installation to fail.
+
+If `service_component_name` is present, then the plugin will use a Consul key consisting of the value
+of `service_component_name` prepended to the fixed string `:dmaap`. For example, if the `service_component_name`
+is `client123`, the plugin will use `client123:dmaap` as the key for storing DMaaP information into Consul.
+Information for all of the feeds and topics for a client are stored under the same key.
+
+The value stored is a nested JSON object. At the top level of the object are properties representing each topic or feed
+for which the component is a publisher or subscriber. The name of the property is the node name of the target feed or topic.
+The value of the property is another JSON object that corresponds to the dictionary that the plugin created in
+`runtime_properties` corresponding to the target feed or topic. Note that the information in Consul includes
+all of the properties for the feed or topic, those set by the source node plugin as well as those set by the DMaaP plugin.
+
+Examples:
+
+Data Router publisher, target feed `feed00`:
+```
+{
+ "feed00": {
+ "username": "rC9QR51I",
+ "log_url": "https://dmaap.example.com/feedlog/972",
+ "publish_url": "https://dmaap.example.com/publish/972",
+ "location": "loc00",
+ "password": "QOQeUh5KLR",
+ "publisher_id": "972.360gm"
+ }
+}
+```
+
+Data Router subscriber, target feed `feed01`:
+```
+{
+ "feed01": {
+ "username": "drdeliver",
+ "password": "1loveDataR0uter",
+ "location": "loc00",
+ "delivery_url": "https://example.com/whatever",
+ "subscriber_id": "1550"
+ }
+}
+```
+
+Message Router publisher to `topic00`, subscriber to `topic01`. Note how each topic
+appears as a top-level property in the object.
+```
+{
+ "topic00": {
+ "topic_url": "https://dmaap.example.com:3905/events/org.onap.dcaegen2.dmaap.FTL2.outboundx",
+ "client_role": "org.onap.dcaegen2.member",
+ "location": "loc00",
+ "client_id": "1494621774522"
+ },
+ "topic01": {
+ "topic_url": "https://dmaap.example.com:3905/events/org.onap.dcaegen2.dmaap.FTL2.inboundx",
+ "client_role": "org.onap.dcaegen2.member",
+ "location": "loc00",
+ "client_id": "1494621778627"
+ }
+}
+```
+
+### Packaging and installing
+The DMaaP plugin is meant to be used as a [Cloudify managed plugin](http://docs.getcloudify.org/3.4.0/plugins/using-plugins/). Managed plugins
+are packaged using [`wagon`](https://github.com/cloudify-cosmo/wagon).
+
+To package this plugin, executing the following command in the top-level directory of this plugin, from a Python environment in which `wagon` has been installed:
+```
+wagon create -s . -r -o /path/to/directory/for/wagon/output
+```
+Once the wagon file is built, it can be uploaded to a Cloudify Manager host using the `cfy plugins upload` command described in the documentation above.
+
+Managed plugins can also be loaded at the time a Cloudify Manager host is installed, via the installation blueprint and inputs file. We expect that this plugin will
+be loaded at Cloudify Manager installation time, and that `cfy plugins upload` will be used only for delivering patches between releases.
+
+### Configuration
+The plugin needs to be configured with certain parameters needed to access the DMaaP Bus Controller. In keeping with the ONAP architecture, this information is
+stored in Consul.
+
+The plugin finds the address and port of the DMaaP Bus Controller using the Consul service discovery facility. The plugin expects the Bus Controller to be
+registered under the name `dmaap_bus_controller`.
+
+Additional parameters come from the `dmaap` key in the Cloudify Manager's Consul configuration, which is stored in the Consul KV store under the key name
+'cloudify_manager'. The table below lists the properties in the configuration:
+
+Property|Type|Required?|Default|Description
+--------|----|---------|-------|--------------------------------
+`username`|string|Yes|(none)|The username for logging into DMaaP Bus Controller
+`password`|string|Yes|(none)|The password for logging into DMaaP Bus Controller
+`owner`|string|Yes|(none)|The name to be used as the owner for entities created by the plugin
+`protocol`|string|No|`https`|The protocol (URL scheme) used to access the DMaaP bus controller (`http` or `https`)
+`path`|string|No|`webapi`|The path to the root of the DMaaP Bus Controller API endpoint
+
+Here is an example of a Cloudify Manager configuration object showing only the `dmaap` key:
+```
+{
+ "dmaap": {
+ "username": "dmaap.client@dcaegen2orch.onap.org",
+ "password": "guessmeifyoucan"
+ "owner": "dcaegen2orc"
+ },
+
+ (other configuration here)
+
+}
+```
diff --git a/dmaap/consulif/__init__.py b/dmaap/consulif/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/dmaap/consulif/__init__.py
diff --git a/dmaap/consulif/consulif.py b/dmaap/consulif/consulif.py
new file mode 100644
index 0000000..a865df4
--- /dev/null
+++ b/dmaap/consulif/consulif.py
@@ -0,0 +1,125 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import consul
+import json
+try:
+ from urllib.parse import urlparse
+except ImportError:
+ from urlparse import urlparse
+
+
+class ConsulHandle(object):
+ '''
+ Provide access to Consul KV store and service discovery
+ '''
+
+ def __init__(self, api_url, user, password, logger):
+ '''
+ Constructor
+ '''
+ u = urlparse(api_url)
+ self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme)
+
+ def get_config(self, key):
+ '''
+ Get configuration information from Consul using the provided key.
+ It should be in JSON form. Convert it to a dictionary
+ '''
+ (index, val) = self.ch.kv.get(key)
+ config = json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate
+ return config
+
+ def get_service(self,service_name):
+ '''
+ Look up the service named service_name in Consul.
+ Return the service address and port.
+ '''
+ (index, val) = self.ch.catalog.service(service_name)
+ if len(val) > 0: # catalog.service returns an empty array if service not found
+ service = val[0] # Could be multiple listings, but we take the first
+ if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0):
+ service_address = service['ServiceAddress'] # Most services should have this
+ else:
+ service_address = service['Address'] # "External" services will have this only
+ service_port = service['ServicePort']
+ else:
+ raise Exception('Could not find service information for "{0}"'.format(service_name))
+
+ return service_address, service_port
+
+ def add_to_entry(self, key, add_name, add_value):
+ '''
+ Find 'key' in consul.
+ Treat its value as a JSON string representing a dict.
+ Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
+ Turn the resulting extended dict into a JSON string.
+ Store the string back into Consul under 'key'.
+ Watch out for conflicting concurrent updates.
+
+ Example:
+ Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
+ add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
+ should result in the value for key 'xyz:dmaap' in consul being updated to
+ '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
+ '''
+
+ while True: # do until update succeeds
+ (index, val) = self.ch.kv.get(key) # index gives version of key retrieved
+
+ if val is None: # no key yet
+ vstring = '{}'
+ mod_index = 0 # Use 0 as the cas index for initial insertion of the key
+ else:
+ vstring = val['Value']
+ mod_index = val['ModifyIndex']
+
+ # Build the updated dict
+ # Exceptions just propagate
+ v = json.loads(vstring)
+ v[add_name] = add_value
+ new_vstring = json.dumps(v)
+
+ updated = self.ch.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false
+ if updated:
+ break
+
+
+ def delete_entry(self,entry_name):
+ '''
+ Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store.
+
+ Note that the kv.delete() operation always returns True,
+ whether there's an entry with key 'entry_name' exists or not. This doesn't seem like
+ a great design, but it means it's safe to try to delete the same entry repeatedly.
+
+ Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and
+ feeds we've stored into the 'component_name:dmaap' entry.
+
+ Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire
+ 'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or
+ subscribes relationship. The first unlink will actually remove the entry, the subsequent ones
+ will harmlessly try to remove it again.
+
+ The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches
+ the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting
+ entry back into Consul. It would be very similar to add_from_entry. When there's nothing left
+ in the entry, then the entire entry would be deleted.
+ '''
+ self.ch.kv.delete(entry_name)
diff --git a/dmaap/dmaap.yaml b/dmaap/dmaap.yaml
new file mode 100644
index 0000000..5b79f9b
--- /dev/null
+++ b/dmaap/dmaap.yaml
@@ -0,0 +1,202 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+
+# Types and relationships for DMaaP data router feeds
+
+tosca_definitions_version: cloudify_dsl_1_3
+
+plugins:
+ dmaapplugin:
+ executor: 'central_deployment_agent'
+ package_name: dmaap
+ package_version: 1.4.0
+
+
+node_types:
+
+ # Data Router feed to be created
+ dcaegen2.nodes.Feed:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ feed_name:
+ type: string
+ required: false
+ feed_version:
+ type: string
+ required: false
+ feed_description:
+ type: string
+ required: false
+ aspr_classification:
+ type: string
+ required: false
+ useExisting:
+ type: boolean
+ required: false
+
+ interfaces:
+ cloudify.interfaces.lifecycle:
+ create:
+ implementation:
+ dmaapplugin.dmaapplugin.dr_lifecycle.create_feed
+ delete:
+ implementation:
+ dmaapplugin.dmaapplugin.dr_lifecycle.delete_feed
+
+ # Existing Data Router feed to be used as target for publishing/subscribing
+ dcaegen2.nodes.ExistingFeed:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ feed_id:
+ type: string
+ required: false
+ feed_name:
+ type: string
+ required: false
+
+ interfaces:
+ cloudify.interfaces.lifecycle:
+ configure:
+ implementation:
+ dmaapplugin.dmaapplugin.dr_lifecycle.get_existing_feed
+
+ # Existing Global Data Router feed (created via Invenio) to be used as target for bridging
+ dcaegen2.nodes.ExternalTargetFeed:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ url:
+ type: string
+ required: true
+ username:
+ type: string
+ required: true
+ userpw:
+ type: string
+ required: true
+
+ # Global Data Router feed to be used as a source for bridging
+ # Has no properties
+ dcaegen2.nodes.ExternalSourceFeed:
+ derived_from: cloudify.nodes.Root
+
+ # Message Router topic to be created
+ dcaegen2.nodes.Topic:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ topic_name:
+ type: string
+ required: false
+ topic_description:
+ type: string
+ required: false
+ txenable:
+ type: boolean
+ required: false
+ replication_case:
+ type: string
+ required: false
+ global_mr_url:
+ type: string
+ required: false
+ useExisting:
+ type: boolean
+ required: false
+
+ interfaces:
+ cloudify.interfaces.lifecycle:
+ create:
+ implementation:
+ dmaapplugin.dmaapplugin.mr_lifecycle.create_topic
+ delete:
+ implementation:
+ dmaapplugin.dmaapplugin.mr_lifecycle.delete_topic
+
+ # Existing Message Router topic to be used as target for publishing/subscribing
+ dcaegen2.nodes.ExistingTopic:
+ derived_from: cloudify.nodes.Root
+
+ properties:
+ fqtn:
+ type: string
+ required: false
+ topic_name:
+ type: string
+ required: false
+
+ interfaces:
+ cloudify.interfaces.lifecycle:
+ configure:
+ implementation:
+ dmaapplugin.dmaapplugin.mr_lifecycle.get_existing_topic
+
+relationships:
+
+ dcaegen2.relationships.publish_files:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.dr_relationships.add_dr_publisher
+ unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_publisher
+
+ dcaegen2.relationships.subscribe_to_files:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.dr_relationships.add_dr_subscriber
+ unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_subscriber
+
+ dcaegen2.relationships.publish_events:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_publisher
+ unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client
+
+ dcaegen2.relationships.subscribe_to_events:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_subscriber
+ unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client
+
+ dcaegen2.relationships.bridges_to:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_dr_bridge
+ unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge
+
+ dcaegen2.relationships.bridges_to_external:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_dr_bridge
+ unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge
+
+ dcaegen2.relationships.bridges_from_external_to_internal:
+ derived_from: cloudify.relationships.connected_to
+ target_interfaces:
+ cloudify.interfaces.relationship_lifecycle:
+ preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_source_dr_bridge
+ unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge
+
diff --git a/dmaap/dmaapcontrollerif/__init__.py b/dmaap/dmaapcontrollerif/__init__.py
new file mode 100644
index 0000000..611169f
--- /dev/null
+++ b/dmaap/dmaapcontrollerif/__init__.py
@@ -0,0 +1 @@
+# DMaaP Bus Controller interface library \ No newline at end of file
diff --git a/dmaap/dmaapcontrollerif/dmaap_requests.py b/dmaap/dmaapcontrollerif/dmaap_requests.py
new file mode 100644
index 0000000..039643d
--- /dev/null
+++ b/dmaap/dmaapcontrollerif/dmaap_requests.py
@@ -0,0 +1,310 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import requests
+
+### "Constants"
+FEEDS_PATH = '/feeds'
+PUBS_PATH = '/dr_pubs'
+SUBS_PATH = '/dr_subs'
+TOPICS_PATH = '/topics'
+CLIENTS_PATH = '/mr_clients'
+LOCATIONS_PATH = '/dcaeLocations'
+
+class DMaaPControllerHandle(object):
+ '''
+ A simple wrapper class to map DMaaP bus controller API calls into operations supported by the requests module
+ '''
+
+ def __init__(self, api_url, user, password, logger,
+ feeds_path = FEEDS_PATH,
+ pubs_path = PUBS_PATH,
+ subs_path = SUBS_PATH,
+ topics_path = TOPICS_PATH,
+ clients_path = CLIENTS_PATH):
+ '''
+ Constructor
+ '''
+ self.api_url = api_url # URL for the root of the Controller resource tree, no trailing "/"
+ self.auth = (user, password) # user name and password for HTTP basic auth
+ self.logger = logger
+ self.feeds_path = feeds_path
+ self.pubs_path = pubs_path
+ self.subs_path = subs_path
+ self.topics_path = topics_path
+ self.clients_path = clients_path
+
+
+ ### INTERNAL FUNCTIONS ###
+
+ def _make_url(self, path):
+ '''
+ Make a full URL given the path relative to the root
+ '''
+ if not path.startswith('/'):
+ path = '/' + path
+
+ return self.api_url + path
+
+ def _get_resource(self, path):
+ '''
+ Get the DMaaP resource at path, where path is relative to the root.
+ '''
+ url = self._make_url(path)
+ self.logger.info("Querying URL: {0}".format(url))
+ return requests.get(url, auth=self.auth)
+
+ def _create_resource(self, path, resource_content):
+ '''
+ Create a DMaaP resource by POSTing to the resource collection
+ identified by path (relative to root), using resource_content as the body of the post
+ '''
+ url = self._make_url(path)
+ self.logger.info("Posting to URL: {0} with body: {1}".format(url, resource_content))
+ return requests.post(url, auth=self.auth, json=resource_content)
+
+ def _delete_resource(self, path):
+ '''
+ Delete the DMaaP resource at path, where path is relative to the root.
+ '''
+ url = self._make_url(path)
+ self.logger.info("Deleting URL: {0}".format(url))
+ return requests.delete(url, auth=self.auth)
+
+ ### PUBLIC API ###
+
+ # Data Router Feeds
+ def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None, useExisting=None):
+ '''
+ Create a DMaaP data router feed with the given feed name
+ and (optionally) feed version, feed description, ASPR classification,
+ owner, and useExisting flag
+ '''
+ feed_definition = {'feedName' : name}
+ if version:
+ feed_definition['feedVersion'] = version
+ if description:
+ feed_definition['feedDescription'] = description
+ if aspr_class:
+ feed_definition['asprClassification'] = aspr_class
+ if owner:
+ feed_definition['owner'] = owner
+ feeds_path_query = self.feeds_path
+ if useExisting == True: # It's a boolean!
+ feeds_path_query += "?useExisting=true"
+
+ return self._create_resource(feeds_path_query, feed_definition)
+
+ def get_feed_info(self, feed_id):
+ '''
+ Get the representation of the DMaaP data router feed whose feed id is feed_id.
+ '''
+ return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
+
+ def get_feed_info_by_name(self, feed_name):
+ '''
+ Get the representation of the DMaaP data router feed whose feed name is feed_name.
+ '''
+ feeds = self._get_resource("{0}".format(self.feeds_path))
+ feed_list = feeds.json()
+ for feed in feed_list:
+ if feed["feedName"] == feed_name:
+ self.logger.info("Found feed with {0}".format(feed_name))
+ feed_id = feed["feedId"]
+ return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
+
+ self.logger.info("feed_name {0} not found".format(feed_name))
+ return None
+
+ def delete_feed(self, feed_id):
+ '''
+ Delete the DMaaP data router feed whose feed id is feed_id.
+ '''
+ return self._delete_resource("{0}/{1}".format(self.feeds_path, feed_id))
+
+ # Data Router Publishers
+ def add_publisher(self, feed_id, location, username, password, status=None):
+ '''
+ Add a publisher to feed feed_id at location location with user, pass, and status
+ '''
+ publisher_definition = {
+ 'feedId' : feed_id,
+ 'dcaeLocationName' : location,
+ 'username' : username,
+ 'userpwd' : password
+ }
+
+ if status:
+ publisher_definition['status'] = status
+
+ return self._create_resource(self.pubs_path, publisher_definition)
+
+ def get_publisher_info(self, pub_id):
+ '''
+ Get the representation of the DMaaP data router publisher whose publisher id is pub_id
+ '''
+ return self._get_resource("{0}/{1}".format(self.pubs_path, pub_id))
+
+ def delete_publisher(self, pub_id):
+ '''
+ Delete the DMaaP data router publisher whose publisher id is id.
+ '''
+ return self._delete_resource("{0}/{1}".format(self.pubs_path, pub_id))
+
+
+ # Data Router SUbscrihers
+ def add_subscriber(self, feed_id, location, delivery_url, username, password, decompress, privileged, status=None):
+ '''
+ Add a publisher to feed feed_id at location location with user, pass, and status
+ '''
+ subscriber_definition = {
+ 'feedId' : feed_id,
+ 'dcaeLocationName' : location,
+ 'deliveryURL' : delivery_url,
+ 'username' : username,
+ 'userpwd' : password,
+ 'decompress': decompress,
+ 'privilegedSubscriber': privileged
+ }
+
+ if status:
+ subscriber_definition['status'] = status
+
+ return self._create_resource(self.subs_path, subscriber_definition)
+
+ def get_subscriber_info(self, sub_id):
+ '''
+ Get the representation of the DMaaP data router subscriber whose subscriber id is sub_id
+ '''
+ return self._get_resource("{0}/{1}".format(self.subs_path, sub_id))
+
+ def delete_subscriber(self, sub_id):
+ '''
+ Delete the DMaaP data router subscriber whose subscriber id is sub_id.
+ '''
+ return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id))
+
+ # Message router topics
+ def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None, useExisting = None):
+ '''
+ Create a message router topic with the topic name 'name' and optionally the topic_description
+ 'description', the 'txenable' flag, the 'useExisting' flag and the topic owner 'owner'.
+ '''
+ topic_definition = {'topicName' : name}
+ if description:
+ topic_definition['topicDescription'] = description
+ if owner:
+ topic_definition['owner'] = owner
+ if txenable != None: # It's a boolean!
+ topic_definition['txenable'] = txenable
+ if replication_case:
+ topic_definition['replicationCase'] = replication_case
+ if global_mr_url:
+ topic_definition['globalMrURL'] = global_mr_url
+ topics_path_query = self.topics_path
+ if useExisting == True: # It's a boolean!
+ topics_path_query += "?useExisting=true"
+
+ return self._create_resource(topics_path_query, topic_definition)
+
+ def get_topic_info(self, fqtn):
+ '''
+ Get information about the topic whose fully-qualified name is 'fqtn'
+ '''
+ return self._get_resource("{0}/{1}".format(self.topics_path, fqtn))
+
+ def get_topic_fqtn_by_name(self, topic_name):
+ '''
+ Get the representation of the DMaaP message router topic fqtn whose topic name is topic_name.
+ '''
+ topics = self._get_resource("{0}".format(self.topics_path))
+ topic_list = topics.json()
+ for topic in topic_list:
+ if topic["topicName"] == topic_name:
+ self.logger.info("Found existing topic with name {0}".format(topic_name))
+ fqtn = topic["fqtn"]
+ return fqtn
+
+ self.logger.info("topic_name {0} not found".format(topic_name))
+ return None
+
+ def delete_topic(self, fqtn):
+ '''
+ Delete the topic whose fully qualified name is 'fqtn'
+ '''
+ return self._delete_resource("{0}/{1}".format(self.topics_path, fqtn))
+
+ # Message route clients (publishers and subscribers
+ def create_client(self, fqtn, location, client_role, actions):
+ '''
+ Creates a client authorized to access the topic with fully-qualified name 'fqtn',
+ from the location 'location', using the AAF client role 'client_role'. The
+ client is authorized to perform actions in the list 'actions'. (Valid
+ values are 'pub', 'sub', and 'view'
+ '''
+ client_definition = {
+ 'fqtn' : fqtn,
+ 'dcaeLocationName' : location,
+ 'clientRole' : client_role,
+ 'action' : actions
+ }
+ return self._create_resource(self.clients_path, client_definition)
+
+ def get_client_info(self, client_id):
+ '''
+ Get client information for the client whose client ID is 'client_id'
+ '''
+ return self._get_resource("{0}/{1}".format(self.clients_path, client_id))
+
+ def delete_client(self, client_id):
+ '''
+ Delete the client whose client ID is 'client_id'
+ '''
+ return self._delete_resource("{0}/{1}".format(self.clients_path, client_id))
+
+ def get_dcae_locations(self, dcae_layer):
+ '''
+ Get the list of location names known to the DMaaP bus controller
+ whose "dcaeLayer" property matches dcae_layer and whose status is "VALID".
+ '''
+ # Do these as a separate step so things like 404 get reported precisely
+ locations = self._get_resource(LOCATIONS_PATH)
+ locations.raise_for_status()
+
+ # pull out location names for VALID locations with matching dcae_layer
+ return [location["dcaeLocationName"] for location in locations.json()
+ if location['dcaeLayer'] == dcae_layer
+ and location['status'] == 'VALID']
+
+ def get_dcae_central_locations(self):
+ '''
+ Get the list of location names known to the DMaaP bus controller
+ whose "dcaeLayer" property contains "central" (ignoring case)
+ and whose status is "VALID".
+ "dcaeLayer" contains "central" for central sites.
+ '''
+ # Do these as a separate step so things like 404 get reported precisely
+ locations = self._get_resource(LOCATIONS_PATH)
+ locations.raise_for_status()
+
+ # pull out location names for VALID central locations
+ return [location["dcaeLocationName"] for location in locations.json()
+ if 'central' in location['dcaeLayer'].lower()
+ and location['status'] == 'VALID']
+
diff --git a/dmaap/dmaapplugin/__init__.py b/dmaap/dmaapplugin/__init__.py
new file mode 100644
index 0000000..7a760d7
--- /dev/null
+++ b/dmaap/dmaapplugin/__init__.py
@@ -0,0 +1,82 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+## Get parameters for accessing the DMaaP controller
+from consulif.consulif import ConsulHandle
+from cloudify.exceptions import NonRecoverableError
+import os
+
+os.environ["REQUESTS_CA_BUNDLE"]="/opt/onap/certs/cacert.pem" # This is to handle https request thru plugin
+
+CONSUL_HOST = "consul" # Should always be a local consul agent on Cloudify Manager
+DBCL_KEY_NAME = "dmaap-plugin" # Consul key containing DMaaP data bus credentials
+# In the ONAP Kubernetes environment, bus controller address is always "dmaap-bc", on port 8080 (http) and 8443 (https)
+ONAP_SERVICE_ADDRESS = "dmaap-bc"
+HTTP_PORT = "8080"
+HTTPS_PORT = "8443"
+
+try:
+ _ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, None)
+except Exception as e:
+ raise NonRecoverableError("Error getting ConsulHandle when configuring dmaap plugin: {0}".format(e))
+
+try:
+ config = _ch.get_config(DBCL_KEY_NAME)
+except Exception as e:
+ raise NonRecoverableError("Error getting config for '{0}' from ConsulHandle when configuring dmaap plugin: {1}".format(DBCL_KEY_NAME, e))
+
+try:
+ DMAAP_USER = config['dmaap']['username']
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_USER while configuring dmaap plugin: {0}".format(e))
+
+try:
+ DMAAP_PASS = config['dmaap']['password']
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_PASS while configuring dmaap plugin: {0}".format(e))
+
+try:
+ DMAAP_OWNER = config['dmaap']['owner']
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_OWNER while configuring dmaap plugin: {0}".format(e))
+
+try:
+ if 'protocol' in config['dmaap']:
+ DMAAP_PROTOCOL = config['dmaap']['protocol']
+ service_port = HTTP_PORT
+ else:
+ DMAAP_PROTOCOL = 'https' # Default to https (service discovery should give us this but doesn't
+ service_port = HTTPS_PORT
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_PROTOCOL while configuring dmaap plugin: {0}".format(e))
+
+try:
+ if 'path' in config['dmaap']:
+ DMAAP_PATH = config['dmaap']['path']
+ else:
+ DMAAP_PATH = 'webapi' # SHould come from service discovery but Consul doesn't support it
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_PATH while configuring dmaap plugin: {0}".format(e))
+
+try:
+ service_address = ONAP_SERVICE_ADDRESS
+ DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH)
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_API_URL while configuring dmaap plugin: {0}".format(e))
+
diff --git a/dmaap/dmaapplugin/dmaaputils.py b/dmaap/dmaapplugin/dmaaputils.py
new file mode 100644
index 0000000..262abcb
--- /dev/null
+++ b/dmaap/dmaapplugin/dmaaputils.py
@@ -0,0 +1,29 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+# Utility functions
+
+import string
+from random import SystemRandom
+
+def random_string(n):
+ '''
+ Create a random alphanumeric string, n characters long.
+ '''
+ secureRandomGen = SystemRandom()
+ return ''.join(secureRandomGen.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for x in range(n))
diff --git a/dmaap/dmaapplugin/dr_bridge.py b/dmaap/dmaapplugin/dr_bridge.py
new file mode 100644
index 0000000..a188667
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_bridge.py
@@ -0,0 +1,199 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS
+from dmaapplugin.dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Set up a subscriber to a source feed
+def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw):
+ # Add subscriber to source feed
+ add_sub = dmc.add_subscriber(source_feed_id, loc, delivery_url, username, userpw)
+ add_sub.raise_for_status()
+ return add_sub.json()
+
+# Set up a publisher to a target feed
+def _set_up_publisher(dmc, target_feed_id, loc):
+ username = random_string(8)
+ userpw = random_string(16)
+ add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw)
+ add_pub.raise_for_status()
+ pub_info = add_pub.json()
+ return pub_info["pubId"], username, userpw
+
+# Get a central location to use when creating a publisher or subscriber
+def _get_central_location(dmc):
+ locations = dmc.get_dcae_central_locations()
+ if len(locations) < 1:
+ raise Exception('No central location found for setting up DR bridging')
+ return locations[0] # We take the first one. Typically there will be two central locations
+
+
+# Set up a "bridge" between two feeds internal to DCAE
+# A source feed "bridges_to" a target feed, meaning that anything published to
+# the source feed will be delivered to subscribers to the target feed (as well as
+# to subscribers of the source feed).
+#
+# The bridge is established by first adding a publisher to the target feed. The result of doing this
+# is a publish URL and a set of publication credentials.
+#The publish URL and publication credentials are used to set up a subscriber to the source feed.
+#I.e., we tell the source feed to deliver to an endpoint which is actually a publish
+# endpoint for the target feed.
+@operation
+def create_dr_bridge(**kwargs):
+
+ try:
+
+ # Get source and target feed ids
+ if 'feed_id' in ctx.target.instance.runtime_properties:
+ target_feed_id = ctx.target.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Target feed has no feed_id property')
+ if 'feed_id' in ctx.source.instance.runtime_properties:
+ source_feed_id = ctx.source.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Source feed has no feed_id property')
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ # Get a location to use when creating a publisher or subscriber--a central location seems reasonable
+ loc = _get_central_location(dmc)
+
+ ctx.logger.info('Creating bridge from feed {0} to feed {1} using location {2}'.format(source_feed_id, target_feed_id, loc))
+
+ # Add publisher to target feed
+ publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
+ ctx.logger.info("Added publisher id {0} to target feed {1} with user {2}".format(publisher_id, target_feed_id, username))
+
+ # Add subscriber to source feed
+ delivery_url = ctx.target.instance.runtime_properties['publish_url']
+ subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw)
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, delivery_url))
+
+ # Save the publisher and subscriber IDs on the source node, indexed by the target node id
+ ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "subscriber_id": subscriber_id}
+
+ except Exception as e:
+ ctx.logger.error("Error creating bridge: {0}".format(e))
+ raise NonRecoverableError(e)
+
+# Set up a bridge from an internal DCAE feed to a feed in an external Data Router system
+# The target feed needs to be provisioned in the external Data Router system. A publisher
+# to that feed must also be set up in the external Data Router system. The publish URL,
+# username, and password need to be captured in a target node of type dcae.nodes.ExternalTargetFeed.
+# The bridge is established by setting up a subscriber to the internal DCAE source feed using the
+# external feed publisher parameters as delivery parameters for the subscriber.
+@operation
+def create_external_dr_bridge(**kwargs):
+ try:
+
+ # Make sure target feed has full set of properties
+ if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties:
+ url = ctx.target.node.properties['url']
+ username = ctx.target.node.properties['username']
+ userpw = ctx.target.node.properties['userpw']
+ else:
+ raise Exception ("Target feed missing url, username, and/or user pw")
+
+ # Make sure source feed has a feed ID
+ if 'feed_id' in ctx.source.instance.runtime_properties:
+ source_feed_id = ctx.source.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Source feed has no feed_id property')
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ # Get a central location to use when creating subscriber
+ loc = _get_central_location(dmc)
+
+ ctx.logger.info('Creating external bridge from feed {0} to external url {1} using location {2}'.format(source_feed_id, url, loc))
+
+ # Create subscription to source feed using properties of the external target feed
+ subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, url, username, userpw)
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, url))
+
+ # Save the subscriber ID on the source node, indexed by the target node id
+ ctx.source.instance.runtime_properties[ctx.target.node.id] = {"subscriber_id": subscriber_id}
+
+ except Exception as e:
+ ctx.logger.error("Error creating external bridge: {0}".format(e))
+ raise NonRecoverableError(e)
+
+# Set up a bridge from a feed in an external Data Router system to an internal DCAE feed.
+# The bridge is established by creating a publisher on the internal DCAE feed. Then a subscription
+# to the external feed is created through manual provisioning in the external Data Router system, using
+# the publish URL and the publisher username and password for the internal feed as the delivery parameters
+# for the external subscription.
+# In order to obtain the publish URL, publisher username, and password, a blueprint using this sort of
+# bridge will typically have an output that exposes the runtime_property set on the source node in this operation.
+@operation
+def create_external_source_dr_bridge(**kwargs):
+ try:
+ # Get target feed id
+ if 'feed_id' in ctx.target.instance.runtime_properties:
+ target_feed_id = ctx.target.instance.runtime_properties['feed_id']
+ else:
+ raise Exception('Target feed has no feed_id property')
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ # Get a central location to use when creating a publisher
+ loc = _get_central_location(dmc)
+
+ # Create a publisher on the target feed
+ publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
+
+ # Save the publisher info on the source node, indexed by the target node
+ ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "url": ctx.target.instance.runtime_properties["publish_url"], "username": username, "userpw": userpw}
+
+ except Exception as e:
+ ctx.logger.error("Error creating external source bridge: {0}".format(e))
+
+# Remove the bridge between the relationship source and target.
+# For a bridge between 2 internal feeds, deletes the subscriber on the source feed and the publisher on the target feed.
+# For a bridge to an external target feed, deletes the subscriber on the source feed.
+# For a bridge from an external source feed, deletes the publisher on the target feed.
+@operation
+def remove_dr_bridge(**kwargs):
+ try:
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+
+ if ctx.target.node.id in ctx.source.instance.runtime_properties:
+
+ if 'subscriber_id' in ctx.source.instance.runtime_properties[ctx.target.node.id]:
+ # Delete the subscription for this bridge
+ ctx.logger.info("Removing bridge -- deleting subscriber {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id']))
+ dmc.delete_subscriber(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id'])
+
+ if 'publisher_id' in ctx.source.instance.runtime_properties:
+ # Delete the publisher for this bridge
+ ctx.logger.info("Removing bridge -- deleting publisher {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id']))
+ dmc.delete_publisher(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id'])
+
+ ctx.logger.info("Remove bridge from {0} to {1}".format(ctx.source.node.id, ctx.target.node.id))
+
+ except Exception as e:
+ ctx.logger.error("Error removing bridge: {0}".format(e))
+ # Let the uninstall workflow proceed--don't throw a NonRecoverableError
diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py
new file mode 100644
index 0000000..af37977
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_lifecycle.py
@@ -0,0 +1,153 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
+from dmaapplugin.dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Lifecycle operations for DMaaP Data Router feeds
+
+@operation
+def create_feed(**kwargs):
+ '''
+ Create a new data router feed
+ Expects "feed_name" to be set in node properties
+ If 'feed_name' is not set or is empty, generates a random one.
+ Allows "feed_version", "feed_description", "aspr_classification" and "useExisting" as optional properties
+ (Sets default values if not provided )
+ Sets instance runtime properties:
+ Note that 'useExisting' is a flag indicating whether DBCL will use existing feed if the feed already exists.
+ - "feed_id"
+ - "publish_url"
+ - "log_url"
+
+ '''
+ try:
+ # Make sure there's a feed_name
+ feed_name = ctx.node.properties.get("feed_name")
+ if not (feed_name and feed_name.strip()):
+ feed_name = random_string(12)
+
+ # Set defaults/placeholders for the optional properties for the feed
+ if "feed_version" in ctx.node.properties:
+ feed_version = ctx.node.properties["feed_version"]
+ else:
+ feed_version = "0.0"
+ if "feed_description" in ctx.node.properties:
+ feed_description = ctx.node.properties["feed_description"]
+ else:
+ feed_description = "No description provided"
+ if "aspr_classification" in ctx.node.properties:
+ aspr_classification = ctx.node.properties["aspr_classification"]
+ else:
+ aspr_classification = "unclassified"
+ if "useExisting" in ctx.node.properties:
+ useExisting = ctx.node.properties["useExisting"]
+ else:
+ useExisting = False
+
+ # Make the request to the controller
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to create feed name {0}".format(feed_name))
+ f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER, useExisting)
+ f.raise_for_status()
+
+ # Capture important properties from the result
+ feed = f.json()
+ ctx.instance.runtime_properties["feed_id"] = feed["feedId"]
+ ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
+ ctx.instance.runtime_properties["log_url"] = feed["logURL"]
+ ctx.logger.info("Created feed name {0} with feed id {1}".format(feed_name, feed["feedId"]))
+
+ except Exception as e:
+ ctx.logger.error("Error creating feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def get_existing_feed(**kwargs):
+ '''
+ Find information for an existing data router feed
+ Expects "feed_id" to be set in node properties -- uniquely identifies the feed
+ Sets instance runtime properties:
+ - "feed_id"
+ - "publish_url"
+ - "log_url"
+ '''
+
+ try:
+ # Make the lookup request to the controller
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("DMaaPControllerHandle() returned")
+ feed_id_input = False
+ if "feed_id" in ctx.node.properties:
+ feed_id_input = True
+ f = dmc.get_feed_info(ctx.node.properties["feed_id"])
+ elif "feed_name" in ctx.node.properties:
+ feed_name = ctx.node.properties["feed_name"]
+ f = dmc.get_feed_info_by_name(feed_name)
+ if f is None:
+ ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name))
+ raise ValueError("Not find existing feed with feed name " + feed_name)
+ else:
+ raise ValueError("Either feed_id or feed_name must be defined to get existing feed")
+
+ f.raise_for_status()
+
+ # Capture important properties from the result
+ feed = f.json()
+ feed_id = feed["feedId"]
+ ctx.instance.runtime_properties["feed_id"] = feed_id # Just to be consistent with newly-created node, above
+ ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
+ ctx.instance.runtime_properties["log_url"] = feed["logURL"]
+ if feed_id_input:
+ ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"]))
+ else:
+ ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"]))
+
+ except ValueError as e:
+ ctx.logger.error("{er}".format(er=e))
+ raise NonRecoverableError(e)
+ except Exception as e:
+ if feed_id_input:
+ ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+ else:
+ ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_feed(**kwargs):
+ '''
+ Delete a feed
+ Expects "feed_id" to be set on the instance's runtime properties
+ '''
+ try:
+ # Make the lookup request to the controllerid=ctx.node.properties["feed_id"]
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ f = dmc.delete_feed(ctx.instance.runtime_properties["feed_id"])
+ f.raise_for_status()
+ ctx.logger.info("Deleting feed id {0}".format(ctx.instance.runtime_properties["feed_id"]))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting feed id {id}: {er}".format(id=ctx.instance.runtime_properties["feed_id"],er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py
new file mode 100644
index 0000000..f1ff986
--- /dev/null
+++ b/dmaap/dmaapplugin/dr_relationships.py
@@ -0,0 +1,219 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST
+from dmaapplugin.dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Lifecycle operations for DMaaP Data Router
+# publish and subscribe relationships
+
+@operation
+def add_dr_publisher(**kwargs):
+ '''
+ Sets up the source of the publishes_relationship as a publisher to the feed that
+ is the target of the relationship
+ Assumes target (the feed) has the following runtime properties set
+ - feed_id
+ - log_url
+ - publish_url
+ Assumes source (the publisher) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing one property:
+ - location (the dcaeLocationName to pass when adding the publisher to the feed)
+ Generates a user name and password that the publisher will need to use when publishing
+ Adds the following properties to the dictionary above:
+ - publish_url
+ - log_url
+ - username
+ - password
+ '''
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ target_feed = ctx.target.node.id
+ ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+ # Set up the parameters for the add_publisher request to the DMaaP bus controller
+ feed_id = ctx.target.instance.runtime_properties["feed_id"]
+ location = ctx.source.instance.runtime_properties[target_feed]["location"]
+ username = random_string(8)
+ password = random_string(16)
+
+ # Make the request to add the publisher to the feed
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ add_pub = dmc.add_publisher(feed_id, location, username, password)
+ add_pub.raise_for_status()
+ publisher_info = add_pub.json()
+ publisher_id = publisher_info["pubId"]
+ ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password))
+
+ # Set runtime properties on the source
+ ctx.source.instance.runtime_properties[target_feed] = {
+ "publisher_id" : publisher_id,
+ "location" : location,
+ "publish_url" : ctx.target.instance.runtime_properties["publish_url"],
+ "log_url" : ctx.target.instance.runtime_properties["log_url"],
+ "username" : username,
+ "password" : password
+ }
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ cpy = dict(ctx.source.instance.runtime_properties[target_feed])
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
+
+ except Exception as e:
+ ctx.logger.error("Error adding publisher to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_publisher(**kwargs):
+ '''
+ Deletes publisher (the source of the publishes_files relationship)
+ from the feed (the target of the relationship).
+ Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties,
+ when the publisher was added to the feed.
+ '''
+
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ # Get the publisher id
+ target_feed = ctx.target.node.id
+ publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"]
+ ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id))
+
+ # Make the request
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ del_result = dmc.delete_publisher(publisher_id)
+ del_result.raise_for_status()
+
+ ctx.logger.info("Deleted publisher {0}".format(publisher_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting publisher: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
+
+
+@operation
+def add_dr_subscriber(**kwargs):
+ '''
+ Sets up the source of the subscribes_to_files relationship as a subscriber to the
+ feed that is the target of the relationship.
+ Assumes target (the feed) has the following runtime property set
+ - feed_id
+ Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing the following properties:
+ - location (the dcaeLocationName to pass when adding the publisher to the feed)
+ - delivery_url (the URL to which data router will deliver files)
+ - username (the username data router will use when delivering files)
+ - password (the password data router will use when delivering files)
+ Adds a property to the dictionary above:
+ - subscriber_id (used to delete the subscriber in the uninstall workflow
+ '''
+ try:
+ target_feed = ctx.target.node.id
+ ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed))
+
+ # Get the parameters for the call
+ feed_id = ctx.target.instance.runtime_properties["feed_id"]
+ feed = ctx.source.instance.runtime_properties[target_feed]
+ location = feed["location"]
+ delivery_url = feed["delivery_url"]
+ username = feed["username"]
+ password = feed["password"]
+ decompress = feed["decompress"] if "decompress" in feed else False
+ privileged = feed["privileged"] if "privileged" in feed else False
+
+ # Make the request to add the subscriber to the feed
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password, decompress, privileged)
+ add_sub.raise_for_status()
+ subscriber_info = add_sub.json()
+ subscriber_id = subscriber_info["subId"]
+ ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location))
+
+ # Add subscriber_id to the runtime properties
+ # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id
+ ctx.source.instance.runtime_properties[target_feed] = {
+ "subscriber_id": subscriber_id,
+ "location" : location,
+ "delivery_url" : delivery_url,
+ "username" : username,
+ "password" : password,
+ "decompress": decompress,
+ "privilegedSubscriber": privileged
+ }
+ ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed]))
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ cpy = dict(ctx.source.instance.runtime_properties[target_feed])
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
+
+ except Exception as e:
+ ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+
+@operation
+def delete_dr_subscriber(**kwargs):
+ '''
+ Deletes subscriber (the source of the subscribes_to_files relationship)
+ from the feed (the target of the relationship).
+ Assumes that the source node's runtime properties dictionary for the target feed
+ includes 'subscriber_id', set when the publisher was added to the feed.
+ '''
+ try:
+ # Get the subscriber id
+ target_feed = ctx.target.node.id
+ subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"]
+ ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed))
+
+ # Make the request
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ del_result = dmc.delete_subscriber(subscriber_id)
+ del_result.raise_for_status()
+
+ ctx.logger.info("Deleted subscriber {0}".format(subscriber_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting subscriber: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py
new file mode 100644
index 0000000..6fe3023
--- /dev/null
+++ b/dmaap/dmaapplugin/mr_lifecycle.py
@@ -0,0 +1,143 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER
+from dmaapplugin.dmaaputils import random_string
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+# Lifecycle operations for DMaaP Message Router topics
+@operation
+def create_topic(**kwargs):
+ '''
+ Creates a message router topic.
+ Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', 'global_mr_url',
+ and 'useExisting' as optional node properties. If 'topic_name' is not set,
+ generates a random one.
+ Sets 'fqtn' in the instance runtime_properties.
+ Note that 'txenable' is a Message Router flag indicating whether transactions
+ are enabled on the topic.
+ Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if
+ the topic already exists.
+ '''
+ try:
+ # Make sure there's a topic_name
+ if "topic_name" in ctx.node.properties:
+ topic_name = ctx.node.properties["topic_name"]
+ if topic_name == '' or topic_name.isspace():
+ topic_name = random_string(12)
+ else:
+ topic_name = random_string(12)
+
+ # Make sure there's a topic description
+ if "topic_description" in ctx.node.properties:
+ topic_description = ctx.node.properties["topic_description"]
+ else:
+ topic_description = "No description provided"
+
+ # ..and the truly optional setting
+ if "txenable" in ctx.node.properties:
+ txenable = ctx.node.properties["txenable"]
+ else:
+ txenable= False
+
+ if "replication_case" in ctx.node.properties:
+ replication_case = ctx.node.properties["replication_case"]
+ else:
+ replication_case = None
+
+ if "global_mr_url" in ctx.node.properties:
+ global_mr_url = ctx.node.properties["global_mr_url"]
+ else:
+ global_mr_url = None
+
+ if "useExisting" in ctx.node.properties:
+ useExisting = ctx.node.properties["useExisting"]
+ else:
+ useExisting = False
+
+ # Make the request to the controller
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
+ t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url, useExisting)
+ t.raise_for_status()
+
+ # Capture important properties from the result
+ topic = t.json()
+ ctx.instance.runtime_properties["fqtn"] = topic["fqtn"]
+
+ except Exception as e:
+ ctx.logger.error("Error creating topic: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def get_existing_topic(**kwargs):
+ '''
+ Get data for an existing topic.
+ Expects 'fqtn' as a node property.
+ Copies this property to 'fqtn' in runtime properties for consistency
+ with a newly-created topic.
+ While there's no real need to make a call to the DMaaP bus controller,
+ we do so just to make sure the fqtn is known to the controller, so we
+ don't run into problems when we try to add a publisher or subscriber later.
+ '''
+ try:
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ fqtn_input = False
+ if "fqtn" in ctx.node.properties:
+ fqtn = ctx.node.properties["fqtn"]
+ fqtn_input = True
+ elif "topic_name" in ctx.node.properties:
+ topic_name = ctx.node.properties["topic_name"]
+ ctx.logger.info("Attempting to get fqtn for existing topic {0}".format(topic_name))
+ fqtn = dmc.get_topic_fqtn_by_name(topic_name)
+ if fqtn is None:
+ raise ValueError("Not find existing topic with name " + topic_name)
+ else:
+ ctx.logger.error("Not find existing topic with name {0}".format(topic_name))
+ raise ValueError("Either fqtn or topic_name must be defined to get existing topic")
+
+ ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn))
+ t = dmc.get_topic_info(fqtn)
+ t.raise_for_status()
+
+ ctx.instance.runtime_properties["fqtn"] = fqtn
+
+ except Exception as e:
+ ctx.logger.error("Error getting existing topic: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def delete_topic(**kwargs):
+ '''
+ Delete the topic. Expects the instance runtime property "fqtn" to have been
+ set when the topic was created.
+ '''
+ try:
+ fqtn = ctx.instance.runtime_properties["fqtn"]
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to delete topic {0}".format(fqtn))
+ t = dmc.delete_topic(fqtn)
+ t.raise_for_status()
+
+ except Exception as e:
+ ctx.logger.error("Error getting existing topic: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
diff --git a/dmaap/dmaapplugin/mr_relationships.py b/dmaap/dmaapplugin/mr_relationships.py
new file mode 100644
index 0000000..34d02e2
--- /dev/null
+++ b/dmaap/dmaapplugin/mr_relationships.py
@@ -0,0 +1,119 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Message router relationship operations
+
+def _add_mr_client(ctype, actions):
+ '''
+ Adds the node represented by 'source' as a client (publisher or subscriber) to
+ to topic represented by the 'target' node. The list of actions in 'actions'
+ determines whether the client is a subscriber or a publisher.
+
+ Assumes target (the topic) has the following runtime property set
+ - fqtn
+ Assumes source (the client) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing the following properties:
+ - location (the dcaeLocationName to pass when adding the client to the topic)
+ - client_role (the AAF client role under which the client will access the topic)
+ Adds two properties to the dictionary above:
+ - topic_url (the URL that the client can use to access the topic)
+ - client_id (used to delete the client in the uninstall workflow)
+ '''
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ target_topic = ctx.target.node.id # Key for the source's dictionary with topic-related info
+ fqtn = ctx.target.instance.runtime_properties["fqtn"]
+ ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn))
+
+ # Get the parameters needed for adding the client
+ location = ctx.source.instance.runtime_properties[target_topic]["location"]
+ client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"]
+
+ # Make the request to add the client to the topic
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ c = dmc.create_client(fqtn, location, client_role, actions)
+ c.raise_for_status()
+ client_info = c.json()
+ client_id = client_info["mrClientId"]
+ topic_url = client_info["topicURL"]
+
+ # Update source's runtime properties
+ #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url
+ #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id
+ ctx.source.instance.runtime_properties[target_topic] = {
+ "topic_url" : topic_url,
+ "client_id" : client_id,
+ "location" : location,
+ "client_role" : client_role
+ }
+
+ ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location))
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic])
+
+ except Exception as e:
+ ctx.logger.error("Error adding client to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def add_mr_publisher(**kwargs):
+ _add_mr_client("publisher", ["view", "pub"])
+
+@operation
+def add_mr_subscriber(**kwargs):
+ _add_mr_client("subscriber", ["view", "sub"])
+
+@operation
+def delete_mr_client(**kwargs):
+ '''
+ Delete the client (publisher or subscriber).
+ Expect property 'client_id' to have been set in the instance's runtime_properties
+ when the client was created.
+ '''
+ try:
+ target_topic = ctx.target.node.id
+ client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"]
+ ctx.logger.info("Attempting to delete client {0} ".format(client_id))
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ c = dmc.delete_client(client_id)
+ c.raise_for_status()
+
+ ctx.logger.info("Deleted client {0}".format(client_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting MR client: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
+
diff --git a/dmaap/pom.xml b/dmaap/pom.xml
new file mode 100644
index 0000000..afc6089
--- /dev/null
+++ b/dmaap/pom.xml
@@ -0,0 +1,327 @@
+<?xml version="1.0"?>
+<!--
+============LICENSE_START=======================================================
+org.onap.dcaegen2
+================================================================================
+Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.onap.dcaegen2.platform</groupId>
+ <artifactId>plugins</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+
+ <!--- CHANGE THE FOLLOWING 3 OBJECTS for your own repo -->
+ <groupId>org.onap.dcaegen2.platform.plugins</groupId>
+ <artifactId>dmaap</artifactId>
+ <name>dmaap</name>
+
+ <version>1.5.0-SNAPSHOT</version>
+ <url>http://maven.apache.org</url>
+ <properties>
+ <!-- name from the setup.py file -->
+ <plugin.name>dmaap</plugin.name>
+ <!-- path to directory containing the setup.py relative to this file -->
+ <plugin.subdir>.</plugin.subdir>
+ <!-- path of types file itself relative to this file -->
+ <typefile.source>dmaap.yaml</typefile.source>
+ <!-- path, in repo, to store type file -->
+ <typefile.dest>type_files/dmaap/dmaap.yaml</typefile.dest>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <sonar.sources>.</sonar.sources>
+ <sonar.junit.reportsPath>xunit-results.xml</sonar.junit.reportsPath>
+ <sonar.python.coverage.reportPaths>coverage.xml</sonar.python.coverage.reportPaths>
+ <sonar.language>py</sonar.language>
+ <sonar.pluginName>Python</sonar.pluginName>
+ <sonar.inclusions>**/*.py</sonar.inclusions>
+ <sonar.exclusions>tests/*,setup.py</sonar.exclusions>
+ </properties>
+
+ <build>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>sonar-maven-plugin</artifactId>
+ <version>2.7.1</version>
+ </plugin>
+
+ <!-- nexus-staging-maven-plugin is called during deploy phase by default behavior.
+ we do not need it -->
+ <plugin>
+ <groupId>org.sonatype.plugins</groupId>
+ <artifactId>nexus-staging-maven-plugin</artifactId>
+ <version>1.6.7</version>
+ <configuration>
+ <skipNexusStagingDeployMojo>true</skipNexusStagingDeployMojo>
+ </configuration>
+ </plugin>
+
+ <!-- maven-deploy-plugin is called during deploy but we do not need it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ <plugins>
+
+ <!-- first disable the default Java plugins at various stages -->
+ <!-- maven-resources-plugin is called during "*resource" phases by default behavior. it prepares the resources
+ dir. we do not need it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <!-- maven-compiler-plugin is called during "compile" phases by default behavior. we do not need it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <!-- maven-jar-plugin is called during "compile" phase by default behavior. we do not need it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>default-jar</id>
+ <phase/>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- maven-install-plugin is called during "install" phase by default behavior. it tries to copy stuff under
+ target dir to ~/.m2. we do not need it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-install-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <!-- maven-surefire-plugin is called during "test" phase by default behavior. it triggers junit test.
+ we do not need it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.12.4</version>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
+
+ <!-- now we configure custom action (calling a script) at various lifecycle phases -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>clean phase script</id>
+ <phase>clean</phase>
+ <goals><goal>exec</goal></goals>
+ <configuration>
+ <executable>${session.executionRootDirectory}/mvn-phase-script.sh</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>clean</argument>
+ </arguments>
+ <environmentVariables>
+ <!-- make mvn properties as env for our script -->
+ <MVN_PROJECT_GROUPID>${project.groupId}</MVN_PROJECT_GROUPID>
+ <MVN_PROJECT_ARTIFACTID>${project.artifactId}</MVN_PROJECT_ARTIFACTID>
+ <MVN_PROJECT_VERSION>${project.version}</MVN_PROJECT_VERSION>
+ <MVN_NEXUSPROXY>${onap.nexus.url}</MVN_NEXUSPROXY>
+ <MVN_RAWREPO_BASEURL_UPLOAD>${onap.nexus.rawrepo.baseurl.upload}</MVN_RAWREPO_BASEURL_UPLOAD>
+ <MVN_RAWREPO_BASEURL_DOWNLOAD>${onap.nexus.rawrepo.baseurl.download}</MVN_RAWREPO_BASEURL_DOWNLOAD>
+ <MVN_RAWREPO_SERVERID>${onap.nexus.rawrepo.serverid}</MVN_RAWREPO_SERVERID>
+ <PLUGIN_NAME>${plugin.name}</PLUGIN_NAME>
+ <PLUGIN_SUBDIR>${plugin.subdir}</PLUGIN_SUBDIR>
+ </environmentVariables>
+ </configuration>
+ </execution>
+
+ <execution>
+ <id>generate-sources script</id>
+ <phase>generate-sources</phase>
+ <goals><goal>exec</goal></goals>
+ <configuration>
+ <executable>mvn-phase-script.sh</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>generate-sources</argument>
+ </arguments>
+ <environmentVariables>
+ <!-- make mvn properties as env for our script -->
+ <MVN_PROJECT_GROUPID>${project.groupId}</MVN_PROJECT_GROUPID>
+ <MVN_PROJECT_ARTIFACTID>${project.artifactId}</MVN_PROJECT_ARTIFACTID>
+ <MVN_PROJECT_VERSION>${project.version}</MVN_PROJECT_VERSION>
+ <MVN_NEXUSPROXY>${onap.nexus.url}</MVN_NEXUSPROXY>
+ <MVN_RAWREPO_BASEURL_UPLOAD>${onap.nexus.rawrepo.baseurl.upload}</MVN_RAWREPO_BASEURL_UPLOAD>
+ <MVN_RAWREPO_BASEURL_DOWNLOAD>${onap.nexus.rawrepo.baseurl.download}</MVN_RAWREPO_BASEURL_DOWNLOAD>
+ <MVN_RAWREPO_SERVERID>${onap.nexus.rawrepo.serverid}</MVN_RAWREPO_SERVERID>
+ </environmentVariables>
+ </configuration>
+ </execution>
+
+ <execution>
+ <id>compile script</id>
+ <phase>compile</phase>
+ <goals><goal>exec</goal></goals>
+ <configuration>
+ <executable>mvn-phase-script.sh</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>compile</argument>
+ </arguments>
+ <environmentVariables>
+ <!-- make mvn properties as env for our script -->
+ <MVN_PROJECT_GROUPID>${project.groupId}</MVN_PROJECT_GROUPID>
+ <MVN_PROJECT_ARTIFACTID>${project.artifactId}</MVN_PROJECT_ARTIFACTID>
+ <MVN_PROJECT_VERSION>${project.version}</MVN_PROJECT_VERSION>
+ <MVN_NEXUSPROXY>${onap.nexus.url}</MVN_NEXUSPROXY>
+ <MVN_RAWREPO_BASEURL_UPLOAD>${onap.nexus.rawrepo.baseurl.upload}</MVN_RAWREPO_BASEURL_UPLOAD>
+ <MVN_RAWREPO_BASEURL_DOWNLOAD>${onap.nexus.rawrepo.baseurl.download}</MVN_RAWREPO_BASEURL_DOWNLOAD>
+ <MVN_RAWREPO_SERVERID>${onap.nexus.rawrepo.serverid}</MVN_RAWREPO_SERVERID>
+ </environmentVariables>
+ </configuration>
+ </execution>
+
+ <execution>
+ <id>package script</id>
+ <phase>package</phase>
+ <goals><goal>exec</goal></goals>
+ <configuration>
+ <executable>mvn-phase-script.sh</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>package</argument>
+ </arguments>
+ <environmentVariables>
+ <!-- make mvn properties as env for our script -->
+ <MVN_PROJECT_GROUPID>${project.groupId}</MVN_PROJECT_GROUPID>
+ <MVN_PROJECT_ARTIFACTID>${project.artifactId}</MVN_PROJECT_ARTIFACTID>
+ <MVN_PROJECT_VERSION>${project.version}</MVN_PROJECT_VERSION>
+ <MVN_NEXUSPROXY>${onap.nexus.url}</MVN_NEXUSPROXY>
+ <MVN_RAWREPO_BASEURL_UPLOAD>${onap.nexus.rawrepo.baseurl.upload}</MVN_RAWREPO_BASEURL_UPLOAD>
+ <MVN_RAWREPO_BASEURL_DOWNLOAD>${onap.nexus.rawrepo.baseurl.download}</MVN_RAWREPO_BASEURL_DOWNLOAD>
+ <MVN_RAWREPO_SERVERID>${onap.nexus.rawrepo.serverid}</MVN_RAWREPO_SERVERID>
+ <PLUGIN_NAME>${plugin.name}</PLUGIN_NAME>
+ <PLUGIN_SUBDIR>${plugin.subdir}</PLUGIN_SUBDIR>
+ </environmentVariables>
+ </configuration>
+ </execution>
+
+ <execution>
+ <id>test script</id>
+ <phase>test</phase>
+ <goals><goal>exec</goal></goals>
+ <configuration>
+ <executable>mvn-phase-script.sh</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>test</argument>
+ </arguments>
+ <environmentVariables>
+ <!-- make mvn properties as env for our script -->
+ <MVN_PROJECT_GROUPID>${project.groupId}</MVN_PROJECT_GROUPID>
+ <MVN_PROJECT_ARTIFACTID>${project.artifactId}</MVN_PROJECT_ARTIFACTID>
+ <MVN_PROJECT_VERSION>${project.version}</MVN_PROJECT_VERSION>
+ <MVN_NEXUSPROXY>${onap.nexus.url}</MVN_NEXUSPROXY>
+ <MVN_RAWREPO_BASEURL_UPLOAD>${onap.nexus.rawrepo.baseurl.upload}</MVN_RAWREPO_BASEURL_UPLOAD>
+ <MVN_RAWREPO_BASEURL_DOWNLOAD>${onap.nexus.rawrepo.baseurl.download}</MVN_RAWREPO_BASEURL_DOWNLOAD>
+ <MVN_RAWREPO_SERVERID>${onap.nexus.rawrepo.serverid}</MVN_RAWREPO_SERVERID>
+ <PLUGIN_NAME>${plugin.name}</PLUGIN_NAME>
+ <PLUGIN_SUBDIR>${plugin.subdir}</PLUGIN_SUBDIR>
+ </environmentVariables>
+ </configuration>
+ </execution>
+
+ <execution>
+ <id>install script</id>
+ <phase>install</phase>
+ <goals><goal>exec</goal></goals>
+ <configuration>
+ <executable>mvn-phase-script.sh</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>install</argument>
+ </arguments>
+ <environmentVariables>
+ <!-- make mvn properties as env for our script -->
+ <MVN_PROJECT_GROUPID>${project.groupId}</MVN_PROJECT_GROUPID>
+ <MVN_PROJECT_ARTIFACTID>${project.artifactId}</MVN_PROJECT_ARTIFACTID>
+ <MVN_PROJECT_VERSION>${project.version}</MVN_PROJECT_VERSION>
+ <MVN_NEXUSPROXY>${onap.nexus.url}</MVN_NEXUSPROXY>
+ <MVN_RAWREPO_BASEURL_UPLOAD>${onap.nexus.rawrepo.baseurl.upload}</MVN_RAWREPO_BASEURL_UPLOAD>
+ <MVN_RAWREPO_BASEURL_DOWNLOAD>${onap.nexus.rawrepo.baseurl.download}</MVN_RAWREPO_BASEURL_DOWNLOAD>
+ <MVN_RAWREPO_SERVERID>${onap.nexus.rawrepo.serverid}</MVN_RAWREPO_SERVERID>
+ </environmentVariables>
+ </configuration>
+ </execution>
+
+ <execution>
+ <id>deploy script</id>
+ <phase>deploy</phase>
+ <goals><goal>exec</goal></goals>
+ <configuration>
+ <executable>${session.executionRootDirectory}/mvn-phase-script.sh</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ <argument>deploy</argument>
+ </arguments>
+ <environmentVariables>
+ <!-- make mvn properties as env for our script -->
+ <MVN_PROJECT_GROUPID>${project.groupId}</MVN_PROJECT_GROUPID>
+ <MVN_PROJECT_ARTIFACTID>${project.artifactId}</MVN_PROJECT_ARTIFACTID>
+ <MVN_PROJECT_VERSION>${project.version}</MVN_PROJECT_VERSION>
+ <MVN_NEXUSPROXY>${onap.nexus.url}</MVN_NEXUSPROXY>
+ <MVN_RAWREPO_BASEURL_UPLOAD>${onap.nexus.rawrepo.baseurl.upload}</MVN_RAWREPO_BASEURL_UPLOAD>
+ <MVN_RAWREPO_BASEURL_DOWNLOAD>${onap.nexus.rawrepo.baseurl.download}</MVN_RAWREPO_BASEURL_DOWNLOAD>
+ <MVN_RAWREPO_SERVERID>${onap.nexus.rawrepo.serverid}</MVN_RAWREPO_SERVERID>
+ <MVN_SERVER_ID>${project.distributionManagement.snapshotRepository.id}</MVN_SERVER_ID>
+ <TYPE_FILE_SOURCE>${typefile.source}</TYPE_FILE_SOURCE>
+ <TYPE_FILE_DEST>${typefile.dest}</TYPE_FILE_DEST>
+ <PLUGIN_NAME>${plugin.name}</PLUGIN_NAME>
+ <PLUGIN_SUBDIR>${plugin.subdir}</PLUGIN_SUBDIR>
+ </environmentVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/dmaap/requirements.txt b/dmaap/requirements.txt
new file mode 100644
index 0000000..54ffbc4
--- /dev/null
+++ b/dmaap/requirements.txt
@@ -0,0 +1,3 @@
+python-consul>=0.7.0
+requests
+cloudify-common>=5.0.5
diff --git a/dmaap/setup.py b/dmaap/setup.py
new file mode 100644
index 0000000..1bdb14f
--- /dev/null
+++ b/dmaap/setup.py
@@ -0,0 +1,36 @@
+# ============LICENSE_START====================================================
+# =============================================================================
+# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from setuptools import setup, find_packages
+
+setup(
+ name = "dmaap",
+ version = "1.5.0",
+ packages=find_packages(),
+ author = "AT&T",
+ description = ("Cloudify plugin for creating DMaaP feeds and topics, and setting up publishers and subscribers."),
+ license = "",
+ keywords = "",
+ url = "",
+ zip_safe=False,
+ install_requires=[
+ 'python-consul>=0.7.0',
+ 'requests',
+ 'cloudify-common>=5.0.5',
+ ],
+)
diff --git a/dmaap/tests/conftest.py b/dmaap/tests/conftest.py
new file mode 100644
index 0000000..9ae7b40
--- /dev/null
+++ b/dmaap/tests/conftest.py
@@ -0,0 +1,88 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+import pytest
+
+import requests
+
+@pytest.fixture()
+def mockconsul(monkeypatch):
+ """ Override the regular Consul interface"""
+ def fake_get_config(self, key):
+ config={'dmaap': {
+ 'username': 'testuser@dmaaptest.example.com',
+ 'url': 'https://dmaaptest.example.com:8443/webapi',
+ 'password' : 'testpassword',
+ 'owner': 'dcaeorch'
+ }}
+ return config
+
+ def fake_get_service(self, service_name):
+ service_address = "myAddress"
+ service_port= "8443"
+ return service_address, service_port
+
+ def fake_add_to_entry(self, key, add_name, add_value):
+ return True
+
+ def fake_delete_entry(self, entry_name):
+ return True
+
+ def fake_init(self, api_url, user, password, logger):
+ pass
+
+ from consulif.consulif import ConsulHandle
+ monkeypatch.setattr(ConsulHandle, 'get_config', fake_get_config)
+ monkeypatch.setattr(ConsulHandle, 'get_service', fake_get_service)
+ monkeypatch.setattr(ConsulHandle, 'add_to_entry', fake_add_to_entry)
+ monkeypatch.setattr(ConsulHandle, 'delete_entry', fake_delete_entry)
+ monkeypatch.setattr(ConsulHandle, '__init__', fake_init)
+
+ def get_handle():
+ return ConsulHandle('mockconsul', None, None, None)
+ return get_handle
+
+
+@pytest.fixture()
+def mockdmaapbc(monkeypatch):
+
+ def fake_get(url, auth):
+ # print "fake_get: {0}, {1}".format(url, auth)
+ r = requests.Response()
+ r.status_code = 200
+ return r
+ def fake_post(url, auth, json):
+ # print "fake_post: {0}, {1}, {2}".format(url, auth, json)
+ r = requests.Response()
+ r.status_code = 200
+ return r
+ def fake_delete(url, auth):
+ # print "fake_delete: {0}, {1}".format(url, auth)
+ r = requests.Response()
+ r.status_code = 200
+ return r
+ def fake_json(self):
+ return {"fqtn":"test_fqtn"}
+
+ import requests
+ monkeypatch.setattr(requests.Response, "json", fake_json)
+ monkeypatch.setattr(requests, "get", fake_get)
+ monkeypatch.setattr(requests, "post", fake_post)
+ monkeypatch.setattr(requests, "delete", fake_delete)
+
diff --git a/dmaap/tests/test_consulif.py b/dmaap/tests/test_consulif.py
new file mode 100644
index 0000000..a45c6a4
--- /dev/null
+++ b/dmaap/tests/test_consulif.py
@@ -0,0 +1,72 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+
+import pytest
+from cloudify.exceptions import NonRecoverableError
+import os
+from consulif.consulif import ConsulHandle
+
+
+# No connections are actually made to this host
+CONSUL_HOST = "consul" # Should always be a local consul agent on Cloudify Manager
+#CONSUL_PORT = '8510'
+CONSUL_PORT = '8500'
+DBCL_KEY_NAME = "dmaap_dbcl_info" # Consul key containing DMaaP data bus credentials
+DBC_SERVICE_NAME= "dmaap_bus_controller" # Name under which the DMaaP bus controller is registered
+
+
+def test_get_config_service(mockconsul):
+ err_msg = "Error getting ConsulHandle when configuring dmaap plugin: {0}"
+ _ch = ConsulHandle("http://{0}:{1}".format(CONSUL_HOST, CONSUL_PORT), None, None, None)
+
+ config = _ch.get_config(DBCL_KEY_NAME)
+
+ DMAAP_USER = config['dmaap']['username']
+ DMAAP_PASS = config['dmaap']['password']
+ DMAAP_OWNER = config['dmaap']['owner']
+
+ if 'protocol' in config['dmaap']:
+ DMAAP_PROTOCOL = config['dmaap']['protocol']
+ else:
+ DMAAP_PROTOCOL = 'https' # Default to https (service discovery should give us this but doesn't
+
+ if 'path' in config['dmaap']:
+ DMAAP_PATH = config['dmaap']['path']
+ else:
+ DMAAP_PATH = 'webapi' # Should come from service discovery but Consul doesn't support it
+
+ service_address, service_port = _ch.get_service(DBC_SERVICE_NAME)
+
+ DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH)
+
+
+def test_add_entry(mockconsul):
+ _ch = ConsulHandle("http://{0}:{1}".format(CONSUL_HOST, CONSUL_PORT), None, None, None)
+
+ key = 'DMAAP_TEST'
+ name = 'dmaap_test_name'
+ value = 'dmaap_test_value'
+ _ch.add_to_entry(key, name, value)
+
+ name = "dmaap_test_name_2"
+ value = 'dmaap_test_value_2'
+ _ch.add_to_entry(key, name, value)
+
+ _ch.delete_entry(key)
diff --git a/dmaap/tests/test_dmaapcontrollerif.py b/dmaap/tests/test_dmaapcontrollerif.py
new file mode 100644
index 0000000..25ddb88
--- /dev/null
+++ b/dmaap/tests/test_dmaapcontrollerif.py
@@ -0,0 +1,113 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+import pytest
+import requests
+from cloudify.mocks import MockCloudifyContext
+from cloudify.state import current_ctx
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+
+
+import test_consulif
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+
+import logging
+logger = logging.getLogger("test_mr_lifecycle")
+
+_goodosv2 = {
+ 'auth_url': 'https://example.com/identity/v2.0',
+ 'password': 'pw',
+ 'region': 'r',
+ 'tenant_name': 'tn',
+ 'username': 'un'
+}
+
+
+def test_dmaapc (monkeypatch, mockconsul, mockdmaapbc):
+ from dmaapplugin.dmaaputils import random_string
+
+ config = mockconsul().get_config('mockkey')['dmaap']
+ DMAAP_API_URL = config['url']
+ DMAAP_USER = config['username']
+ DMAAP_PASS = config['password']
+ DMAAP_OWNER = config['owner']
+
+ properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2 }
+ mock_ctx = MockCloudifyContext(
+ node_id='test_node_id',
+ node_name='test_node_name',
+ properties=properties,
+ runtime_properties = {
+ "admin": { "user": "admin_user" },
+ "user": { "user": "user_user" },
+ "viewer": { "user": "viewer_user" }
+ })
+
+ current_ctx.set(mock_ctx)
+
+ kwargs = { "topic_name": "ONAP_test",
+ "topic_description": "onap dmaap plugin unit test topic"}
+
+ # Make sure there's a topic_name
+ if "topic_name" in ctx.node.properties:
+ topic_name = ctx.node.properties["topic_name"]
+ if topic_name == '' or topic_name.isspace():
+ topic_name = random_string(12)
+ else:
+ topic_name = random_string(12)
+
+ # Make sure there's a topic description
+ if "topic_description" in ctx.node.properties:
+ topic_description = ctx.node.properties["topic_description"]
+ else:
+ topic_description = "No description provided"
+
+ # ..and the truly optional setting
+ if "txenable" in ctx.node.properties:
+ txenable = ctx.node.properties["txenable"]
+ else:
+ txenable= False
+
+ if "replication_case" in ctx.node.properties:
+ replication_case = ctx.node.properties["replication_case"]
+ else:
+ replication_case = None
+
+ if "global_mr_url" in ctx.node.properties:
+ global_mr_url = ctx.node.properties["global_mr_url"]
+ else:
+ global_mr_url = None
+
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
+ t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url)
+
+ # Capture important properties from the result
+ topic = t.json()
+ ctx.instance.runtime_properties["fqtn"] = topic["fqtn"]
+
+ # test DMaaPControllerHandle functions
+ path = "myPath"
+ url = dmc._make_url(path)
+ rc = dmc._get_resource(path)
+ rc = dmc._create_resource(path, None)
+ rc = dmc._delete_resource(path)
diff --git a/dmaap/tests/test_dr_lifecycle.py b/dmaap/tests/test_dr_lifecycle.py
new file mode 100644
index 0000000..2aa65e8
--- /dev/null
+++ b/dmaap/tests/test_dr_lifecycle.py
@@ -0,0 +1,65 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+import pytest
+import requests
+from cloudify.mocks import MockCloudifyContext
+from cloudify.state import current_ctx
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from cloudify.exceptions import RecoverableError
+
+_goodosv2 = {
+ 'auth_url': 'https://example.com/identity/v2.0',
+ 'password': 'pw',
+ 'region': 'r',
+ 'tenant_name': 'tn',
+ 'username': 'un'
+}
+
+
+def test_create_feed(monkeypatch, mockconsul, mockdmaapbc):
+ import dmaapplugin
+ from dmaapplugin import dr_lifecycle
+
+ properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2, 'feed_id': 'test_feed_id' }
+ mock_ctx = MockCloudifyContext(
+ node_id='test_node_id',
+ node_name='test_node_name',
+ properties=properties,
+ runtime_properties = {
+ "admin": { "user": "admin_user" },
+ "user": { "user": "user_user" },
+ "viewer": { "user": "viewer_user" }
+ })
+
+ current_ctx.set(mock_ctx)
+
+ kwargs = { "feed_name": "ONAP_test",
+ "feed_description": "onap dmaap plugin unit test feed"}
+
+ def fake_feed(self):
+ return {"feedId":"test_feedId", "publishURL":"test_publishURL", "logURL":"test_logURL" }
+ monkeypatch.setattr(requests.Response, "json", fake_feed)
+
+ dr_lifecycle.create_feed(**kwargs)
+ dr_lifecycle.get_existing_feed(**kwargs)
+ dr_lifecycle.delete_feed(**kwargs)
diff --git a/dmaap/tests/test_mr_lifecycle.py b/dmaap/tests/test_mr_lifecycle.py
new file mode 100644
index 0000000..4a6a583
--- /dev/null
+++ b/dmaap/tests/test_mr_lifecycle.py
@@ -0,0 +1,59 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+import pytest
+import requests
+from cloudify.mocks import MockCloudifyContext
+from cloudify.state import current_ctx
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from cloudify.exceptions import RecoverableError
+
+_goodosv2 = {
+ 'auth_url': 'https://example.com/identity/v2.0',
+ 'password': 'pw',
+ 'region': 'r',
+ 'tenant_name': 'tn',
+ 'username': 'un'
+}
+
+
+def test_create_topic(monkeypatch, mockconsul, mockdmaapbc):
+ import dmaapplugin
+ from dmaapplugin import mr_lifecycle
+ properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2, 'fqtn': 'test_fqtn' }
+ mock_ctx = MockCloudifyContext(
+ node_id='test_node_id',
+ node_name='test_node_name',
+ properties=properties,
+ runtime_properties = {
+ "admin": { "user": "admin_user" },
+ "user": { "user": "user_user" },
+ "viewer": { "user": "viewer_user" }
+ })
+
+ current_ctx.set(mock_ctx)
+
+ kwargs = { "topic_name": "ONAP_test",
+ "topic_description": "onap dmaap plugin unit test topic"}
+
+ mr_lifecycle.create_topic(**kwargs)
+ mr_lifecycle.get_existing_topic(**kwargs)
+ mr_lifecycle.delete_topic(**kwargs)
diff --git a/dmaap/tests/test_plugin.py b/dmaap/tests/test_plugin.py
new file mode 100644
index 0000000..e2a8586
--- /dev/null
+++ b/dmaap/tests/test_plugin.py
@@ -0,0 +1,26 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import pytest
+import requests
+from cloudify.mocks import MockCloudifyContext
+from cloudify.state import current_ctx
+from cloudify.exceptions import NonRecoverableError
+
+def test_noop():
+ pass
diff --git a/dmaap/tests/test_utils.py b/dmaap/tests/test_utils.py
new file mode 100644
index 0000000..362948d
--- /dev/null
+++ b/dmaap/tests/test_utils.py
@@ -0,0 +1,26 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+import pytest
+
+
+def test_random_string(monkeypatch):
+ from dmaapplugin import dmaaputils
+ target_length = 10
+ assert len(dmaaputils.random_string(target_length)) == target_length
diff --git a/dmaap/tox.ini b/dmaap/tox.ini
new file mode 100644
index 0000000..5bcede7
--- /dev/null
+++ b/dmaap/tox.ini
@@ -0,0 +1,36 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2020 Pantheon.tech. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+[tox]
+envlist = py27,py36,py37,py38
+skip_missing_interpreters = true
+
+[testenv]
+deps=
+ pytest
+ coverage
+ pytest-cov
+ -rrequirements.txt
+setenv =
+ PYTHONPATH={toxinidir}
+commands=
+ pytest --junitxml xunit-results.xml --cov dmaapcontrollerif --cov consulif --cov dmaapplugin --cov-report xml
+ coverage xml
+ coverage report
+ coverage html