diff options
author | Jack Lucas <jflucas@research.att.com> | 2017-09-01 13:48:08 +0000 |
---|---|---|
committer | Jack Lucas <jflucas@research.att.com> | 2017-09-01 13:50:18 +0000 |
commit | 0619cb3772a12e0a5a3aecc09b96eeb7df20a000 (patch) | |
tree | b8a921e0e41578163f7c2db41450301c374361df | |
parent | ac3f029edc6bc8b25ab44666b5408c8929e6ba92 (diff) |
Add seed code for DMaaP plugin
Change-Id: I8c7a9c432badd3052a571ed87b9b580760b376e6
Issue-Id: CCSDK-65
Signed-off-by: Jack Lucas <jflucas@research.att.com>
-rw-r--r-- | dmaap/.gitignore | 69 | ||||
-rw-r--r-- | dmaap/LICENSE.txt | 17 | ||||
-rw-r--r-- | dmaap/README.md | 322 | ||||
-rw-r--r-- | dmaap/consulif/__init__.py | 0 | ||||
-rw-r--r-- | dmaap/consulif/consulif.py | 120 | ||||
-rw-r--r-- | dmaap/dmaap.yaml | 193 | ||||
-rw-r--r-- | dmaap/dmaapcontrollerif/__init__.py | 1 | ||||
-rw-r--r-- | dmaap/dmaapcontrollerif/dmaap_requests.py | 256 | ||||
-rw-r--r-- | dmaap/dmaapplugin/__init__.py | 46 | ||||
-rw-r--r-- | dmaap/dmaapplugin/dmaaputils.py | 28 | ||||
-rw-r--r-- | dmaap/dmaapplugin/dr_bridge.py | 198 | ||||
-rw-r--r-- | dmaap/dmaapplugin/dr_lifecycle.py | 121 | ||||
-rw-r--r-- | dmaap/dmaapplugin/dr_relationships.py | 211 | ||||
-rw-r--r-- | dmaap/dmaapplugin/mr_lifecycle.py | 121 | ||||
-rw-r--r-- | dmaap/dmaapplugin/mr_relationships.py | 119 | ||||
-rw-r--r-- | dmaap/requirements.txt | 1 | ||||
-rw-r--r-- | dmaap/setup.py | 16 | ||||
-rw-r--r-- | dmaap/tests/test_plugin.py | 26 | ||||
-rw-r--r-- | dmaap/tox.ini | 26 |
19 files changed, 1891 insertions, 0 deletions
diff --git a/dmaap/.gitignore b/dmaap/.gitignore new file mode 100644 index 0000000..af8e550 --- /dev/null +++ b/dmaap/.gitignore @@ -0,0 +1,69 @@ +.project +.pydevproject +wheels +venv +cdap.zip +docker.zip +*.wgn +*.swp +*.swn +*.swo +.DS_Store + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ diff --git a/dmaap/LICENSE.txt b/dmaap/LICENSE.txt new file mode 100644 index 0000000..f90f8f1 --- /dev/null +++ b/dmaap/LICENSE.txt @@ -0,0 +1,17 @@ +============LICENSE_START======================================================= +org.onap.ccsdk +================================================================================ +Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +================================================================================ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END========================================================= diff --git a/dmaap/README.md b/dmaap/README.md new file mode 100644 index 0000000..646400f --- /dev/null +++ b/dmaap/README.md @@ -0,0 +1,322 @@ +## Cloudify DMaaP Plugin +Cloudify plugin for creating and managing DMaaP Data Router feeds and subscriptions and +DMaaP Message Router topics. The plugin uses the DMaaP Bus Controller API. + +### Plugin Support for DMaaP Data Router +#### Plugin Types for DMaaP Data Router +The Cloudify type definitions for DMaaP Data Router nodes and relationships +are defined in [`dmaap.yaml`](./dmaap.yaml). + +There are four node types for DMaaP Data Router: + +- `ccsdk.nodes.Feed`: This type represents a feed that does not yet +exist and that should be created when the install workflow is +run against a blueprint that contains a node of this type. + +Property|Type|Required?|Description | +--------|----|---------|--------------------------------------- +feed_name|string|no|a name that identifies the feed (plugin will generate if absent) +feed_version|string|no|version number for the feed (feed_name + feed_version uniquely identify the feed in DR) +feed_description|string|no|human-readable description of the feed +aspr_classification|string|no|AT&T ASPR classification of the feed + + +- `ccsdk.nodes.ExistingFeed`: This type represents a feed that +already exists. Nodes of this type are placed in a blueprint so +that other nodes in the blueprint can be set up as publishers or +subscribers to the feed. The table below shows the properties that a node +of this type may have. + +Property|Type|Required?|Description +--------|----|---------|---------------------------------------- +feed_id|string|yes|Feed identifier assigned by DMaaP when the feed was created + +- `ccsdk.nodes.ExternalTargetFeed`: This type represents a feed created in an external DMaaP +environment (i.e., an environment that the plugin cannot access to make provisioning requests, such as +a shared corporate system). Nodes of this type are placed in a blueprint so that other feed nodes of +type `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed` can be set up to "bridge" to external feeds by +publishing data to the external feeds. The table below shows the properties that a node of this type +may have. + +Property|Type|Required?|Description +--------|----|---------|---------------------------------------- +url|string|yes|The publish URL of the external feed. +username|string|yes|The username to be used when delivering to the external feed +userpw|string|yes|The password to be used when delivering to the external feed + +_Note: These properties are usually obtained by manually creating a feed in the external +DMaaP DR system and then creating a publisher for that feed._ + +- `ccsdk.nodes.ExternalSourceFeed`: This type represents a feed created in an external DMaaP +environment (i.e., an environment that the plugin cannot access to makes provisioning requests, such as +a shared corporate system). Nodes of this type are place in a blueprint so that they can be set up to +"bridge" to other feed nodes of type `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed`. This type +has no node properties, but when a bridge is set up, the url, username, and password are attached to the +node as runtime_properties, using the name of the target feed node as the top-level key. + +There are five relationship types for DMaaP Data Router: + +- `ccsdk.relationships.publish_files`, used to +indicate that the relationship's source node sends is a publisher to the +Data Router feed represented by the relationship's target node. +- `ccsdk.relationships.subscribe_to_files`, used to +indicate that the relationship's source node is a subscriber to the +Data Router feed represented by the relationship's target node. +- `ccsdk.relationships.bridges_to`, used to indicate that the relationship's source +node (a `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed`) should be set up +to forward data ("bridge") to the relationship's target feed (another `ccsdk.nodes.Feed` or +`ccsdk.nodes.ExistingFeed`). +- `ccsdk.relationships.bridges_to_external`, used to indicate that the relationship's source +node (a `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed`) should be set up +to forward data ("bridge") to the relationship's target node (a feed in an external DMaaP system, +represented by a `ccsdk.nodes.ExternalTargetFeed` node). +- `ccsdk.relationships.bridges_from_external_to_internal`, used to indicate the the relationship's source +node (a feed in an external DMaaP system, represented by a `ccsdk.nodes.ExternalSourceFeed` node) should be set up to forward date ("bridge") +to the relationship's target node (an internal ONAP feed, represented by a `ccsdk.nodes.Feed` or `ccsdk.nodes.ExistingFeed` node). + +The plugin code implements the lifecycle operations needed to create and +delete feeds and to add and remove publishers and subscribers. It also implements +the operations needed to set up bridging between feeds. + +#### Interaction with Other Plugins +When creating a new feed or processing a reference to an existing feed, +the plugin operates independently of other plugins. + +When processing a `ccsdk.relationships.publish_files` relationship or a +`ccsdk.relationships.subscribe_to_files` relationship, this plugin needs +to obtain data from the source node and, in the case of `publish_files`, provide +data to the source node. Certain conventions are therefore needed for +passing data between this plugin and the plugins responsible for the source +nodes in these relationships. In Cloudify, the mechanism for +sharing data among plugins is the `ctx.instance.runtime_properties` dictionary +associated with each node. + +A given source node may have relationships with several feeds. For example, an ONAP DCAE +data collector might publish two different types of data to two different feeds. An ONAP DCAE +analytics module might subscribe to one feed to get input for its processing and +publish its results to a different feed. When this DMaaP plugin and the plugin for the +source node exchange information, they need to do in a way that lets them distinguish +among different feeds. We do this through a simple convention: for each source node +to feed relationship, the source node plugin will create a property in the source node's +`runtime_properties` dictionary. The name of the property will be the same as the +name of the target node of the relationship. For instance, if a node has a +`publishes_files` relationship with a target node named `feed00`, then the plugin that's +responsible for managing the source node with create an entry in the source node's +`runtime_properties` dictionary named `feed00`. This entry itself will be a dictionary. + +The content of this data exchange dictionary depends on whether the source node is a +publisher (i.e., the relationship is `publish_files`) or a subscriber (i.e., the +relationship is `subscribe_to_files`). + +For the `publish_files` relationship, the data exchange dictionary has the following +properties: + +Property|Set by|Description +--------|------|------------------------------------------------ +location|source node plugin|the DMaaP location for the publisher, used to set up routing +publish_url|DMaaP plugin|the URL to which the publisher makes Data Router publish requests +log_url|DMaaP plugin|the URL from which log data for the feed can be obtained +username|DMaaP plugin|the username (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router +password|DMaaP plugin|the password (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router + +For the `subscribe_to_files` relationship, the data exchange dictionary has the following +properties: + +Property|Set by|Description +--------|------|------------------------------------------------ +location|source node plugin|the DMaaP location for the subscriber, used to set up routing +delivery_url|source node plugin|the URL to which the Data Router should deliver files +username|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering files +password|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering file + +### Plugin Support for DMaaP Message Router +#### Plugin Types for DMaaP Message Router +The Cloudify type definitions for DMaaP Message Router nodes and relationships +are defined in [`dmaap.yaml`](./dmaap.yaml). + +There are two node types for DMaaP Message Router: + +- `ccsdk.nodes.Topic`: This type represents a topic that does not yet +exist and that should be created when the install workflow is +run against a blueprint that contains a node of this type. + +Property|Type|Required?|Description +--------|----|---------|--------------------------------------- +topic_name|string|no|a name that uniquely identifies the feed (plugin will generate if absent) +topic_description|string|no|human-readable description of the feed +txenable|boolean|no|flag indicating whether transactions are enabled for this topic +replication_case|string|no|type of replication required for the topic (defaults to no replication) +global_mr_url|string|no|Global MR host name for replication to a global MR instance + +Note: In order to set up topics, a user should be familiar with message router and how it is configured, +and this README is not the place to explain the details of message router. Here are a couple of pieces of +information that might be helpful. +Currently, the allowed values for `replication_case` are: + +- `REPLICATION_NONE` +- `REPLICATION_EDGE_TO_CENTRAL` +- `REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL` +- `REPLICATION_CENTRAL_TO_EDGE` +- `REPLICATION_CENTRAL_TO_GLOBAL` +- `REPLICATION_GLOBAL_TO_CENTRAL` +- `REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE` + +The `global_mr_url` is actually a host name, not a full URL. It points to a host in a global message router +cluster. (A 'global' message router cluster is one that's not part of ONAP.) + +- `ccsdk.nodes.ExistingTopic`: This type represents a topic that +already exists. Nodes of this type are placed in a blueprint so +that other nodes in the blueprint can be set up as publishers or +subscribers to the topic. The table below shows the properties that a node +of this type may have. + +Property|Type|Required?|Description +--------|----|---------|---------------------------------------- +fqtn|string|yes|fully-qualified topic name for the topic + +#### Interaction with Other Plugins +When creating a new topic or processing a reference to an existing topic, +the plugin operates independently of other plugins. + +When processing a `ccsdk.relationships.publish_events` relationship or a +`ccsdk.relationships.subscribe_to_events` relationship, this plugin needs +to obtain data from and provide data to the source node. Certain conventions are therefore needed for +passing data between this plugin and the plugins responsible for the source +nodes in these relationships. In Cloudify, the mechanism for +sharing data among plugins is the `ctx.instance.runtime_properties` dictionary +associated with each node. + +A given source node may have relationships with several topics. For example, an ONAP DCAE +analytics module might subscribe to one topic to get input for its processing and +publish its results to a different topic. When this DMaaP plugin and the plugin for the +source node exchange information, they need to do in a way that lets them distinguish +among different feeds. We do this through a simple convention: for each source node +to topic relationship, the source node plugin will create a property in the source node's +`runtime_properties` dictionary. The name of the property will be the same as the +name of the target node of the relationship. For instance, if a node has a +`publishes_events` relationship with a target node named `topic00`, then the plugin that's +responsible for managing the source node with create an entry in the source node's +`runtime_properties` dictionary named `topic00`. This entry itself will be a dictionary. + +For both types of relationship, the data exchange dictionary has the following +properties: + +Property|Set by|Description +--------|------|------------------------------------------------ +location|source node plugin|the DMaaP location for the publisher or subscriber, used to set up routing +client_role|source node plugin|the AAF client role that's requesting publish or subscribe access to the topic +topic_url|DMaaP plugin|the URL for accessing the topic to publish or receive events + +### Interaction with Consul configuration store +In addition to storing the results of DMaaP Data Router and DMaaP Message Router provisioning operations in `runtime_properties`, +the DMaaP plugin also stores these results into the ONAP configuration store, which resides in a +[Consul key-value store](https://www.consul.io/). This allows DMaaP clients (components that act as publishers, subscribers, or both) +to retrieve their DMaaP configuration information from Consul, rather than having the plugin that deploys the client directly +configure the client using data in `runtime_properties`. + +The `runtime_properties` for a client must contain a property called `service_component_name`. If this property is not present, +the plugin will raise a NonRecoverableError and cause the installation to fail. + +If `service_component_name` is present, then the plugin will use a Consul key consisting of the value +of `service_component_name` prepended to the fixed string `:dmaap`. For example, if the `service_component_name` +is `client123`, the plugin will use `client123:dmaap` as the key for storing DMaaP information into Consul. +Information for all of the feeds and topics for a client are stored under the same key. + +The value stored is a nested JSON object. At the top level of the object are properties representing each topic or feed +for which the component is a publisher or subscriber. The name of the property is the node name of the target feed or topic. +The value of the property is another JSON object that corresponds to the dictionary that the plugin created in +`runtime_properties` corresponding to the target feed or topic. Note that the information in Consul includes +all of the properties for the feed or topic, those set by the source node plugin as well as those set by the DMaaP plugin. + +Examples: + +Data Router publisher, target feed `feed00`: +``` +{ + "feed00": { + "username": "rC9QR51I", + "log_url": "https://dmaap.example.com/feedlog/972", + "publish_url": "https://dmaap.example.com/publish/972", + "location": "loc00", + "password": "QOQeUh5KLR", + "publisher_id": "972.360gm" + } +} +``` + +Data Router subscriber, target feed `feed01`: +``` +{ + "feed01": { + "username": "drdeliver", + "password": "1loveDataR0uter", + "location": "loc00", + "delivery_url": "https://example.com/whatever", + "subscriber_id": "1550" + } +} +``` + +Message Router publisher to `topic00`, subscriber to `topic01`. Note how each topic +appears as a top-level property in the object. +``` +{ + "topic00": { + "topic_url": "https://dmaap.example.com:3905/events/org.onap.ccsdk.dmaap.FTL2.outboundx", + "client_role": "org.onap.ccsdk.member", + "location": "loc00", + "client_id": "1494621774522" + }, + "topic01": { + "topic_url": "https://dmaap.example.com:3905/events/org.onap.ccsdk.dmaap.FTL2.inboundx", + "client_role": "org.onap.ccsdk.member", + "location": "loc00", + "client_id": "1494621778627" + } +} +``` + +### Packaging and installing +The DMaaP plugin is meant to be used as a [Cloudify managed plugin](http://docs.getcloudify.org/3.4.0/plugins/using-plugins/). Managed plugins +are packaged using [`wagon`](https://github.com/cloudify-cosmo/wagon). + +To package this plugin, executing the following command in the top-level directory of this plugin, from a Python environment in which `wagon` has been installed: +``` +wagon create -s . -r -o /path/to/directory/for/wagon/output +``` +Once the wagon file is built, it can be uploaded to a Cloudify Manager host using the `cfy plugins upload` command described in the documentation above. + +Managed plugins can also be loaded at the time a Cloudify Manager host is installed, via the installation blueprint and inputs file. We expect that this plugin will +be loaded at Cloudify Manager installation time, and that `cfy plugins upload` will be used only for delivering patches between releases. + +### Configuration +The plugin needs to be configured with certain parameters needed to access the DMaaP Bus Controller. In keeping with the ONAP architecture, this information is +stored in Consul. + +The plugin finds the address and port of the DMaaP Bus Controller using the Consul service discovery facility. The plugin expects the Bus Controller to be +registered under the name `dmaap_bus_controller`. + +Additional parameters come from the `dmaap` key in the Cloudify Manager's Consul configuration, which is stored in the Consul KV store under the key name +'cloudify_manager'. The table below lists the properties in the configuration: + +Property|Type|Required?|Default|Description +--------|----|---------|-------|-------------------------------- +`username`|string|Yes|(none)|The username for logging into DMaaP Bus Controller +`password`|string|Yes|(none)|The password for logging into DMaaP Bus Controller +`owner`|string|Yes|(none)|The name to be used as the owner for entities created by the plugin +`protocol`|string|No|`https`|The protocol (URL scheme) used to access the DMaaP bus controller (`http` or `https`) +`path`|string|No|`webapi`|The path to the root of the DMaaP Bus Controller API endpoint + +Here is an example of a Cloudify Manager configuration object showing only the `dmaap` key: +``` +{ + "dmaap": { + "username": "dmaap.client@ccsdkorch.onap.org", + "password": "guessmeifyoucan" + "owner": "ccsdkorc" + }, + + (other configuration here) + +} +``` diff --git a/dmaap/consulif/__init__.py b/dmaap/consulif/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/dmaap/consulif/__init__.py diff --git a/dmaap/consulif/consulif.py b/dmaap/consulif/consulif.py new file mode 100644 index 0000000..e742895 --- /dev/null +++ b/dmaap/consulif/consulif.py @@ -0,0 +1,120 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +import consul +import json +from urlparse import urlparse + +class ConsulHandle(object): + ''' + Provide access to Consul KV store and service discovery + ''' + + def __init__(self, api_url, user, password, logger): + ''' + Constructor + ''' + u = urlparse(api_url) + self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme) + + def get_config(self, key): + ''' + Get configuration information from Consul using the provided key. + It should be in JSON form. Convert it to a dictionary + ''' + (index, val) = self.ch.kv.get(key) + config = json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate + return config + + def get_service(self,service_name): + ''' + Look up the service named service_name in Consul. + Return the service address and port. + ''' + (index, val) = self.ch.catalog.service(service_name) + if len(val) > 0: # catalog.service returns an empty array if service not found + service = val[0] # Could be multiple listings, but we take the first + if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0): + service_address = service['ServiceAddress'] # Most services should have this + else: + service_address = service['Address'] # "External" services will have this only + service_port = service['ServicePort'] + else: + raise Exception('Could not find service information for "{0}"'.format(service_name)) + + return service_address, service_port + + def add_to_entry(self, key, add_name, add_value): + ''' + Find 'key' in consul. + Treat its value as a JSON string representing a dict. + Extend the dict by adding an entry with key 'add_name' and value 'add_value'. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under 'key'. + Watch out for conflicting concurrent updates. + + Example: + Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}' + add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'}) + should result in the value for key 'xyz:dmaap' in consul being updated to + '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}' + ''' + + while True: # do until update succeeds + (index, val) = self.ch.kv.get(key) # index gives version of key retrieved + + if val is None: # no key yet + vstring = '{}' + mod_index = 0 # Use 0 as the cas index for initial insertion of the key + else: + vstring = val['Value'] + mod_index = val['ModifyIndex'] + + # Build the updated dict + # Exceptions just propagate + v = json.loads(vstring) + v[add_name] = add_value + new_vstring = json.dumps(v) + + updated = self.ch.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false + if updated: + break + + + def delete_entry(self,entry_name): + ''' + Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store. + + Note that the kv.delete() operation always returns True, + whether there's an entry with key 'entry_name' exists or not. This doesn't seem like + a great design, but it means it's safe to try to delete the same entry repeatedly. + + Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and + feeds we've stored into the 'component_name:dmaap' entry. + + Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire + 'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or + subscribes relationship. The first unlink will actually remove the entry, the subsequent ones + will harmlessly try to remove it again. + + The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches + the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting + entry back into Consul. It would be very similar to add_from_entry. When there's nothing left + in the entry, then the entire entry would be deleted. + ''' + self.ch.kv.delete(entry_name) diff --git a/dmaap/dmaap.yaml b/dmaap/dmaap.yaml new file mode 100644 index 0000000..1c3ff43 --- /dev/null +++ b/dmaap/dmaap.yaml @@ -0,0 +1,193 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + + +# Types and relationships for DMaaP data router feeds + +tosca_definitions_version: cloudify_dsl_1_3 + +imports: + - http://www.getcloudify.org/spec/cloudify/3.4/types.yaml + +plugins: + dmaapplugin: + executor: 'central_deployment_agent' + package_name: cloudifydmaapplugin + package_version: 1.2.0 + + +node_types: + + # Data Router feed to be created + ccsdk.nodes.Feed: + derived_from: cloudify.nodes.Root + + properties: + feed_name: + type: string + required: false + feed_version: + type: string + required: false + feed_description: + type: string + required: false + aspr_classification: + type: string + required: false + + interfaces: + cloudify.interfaces.lifecycle: + create: + implementation: + dmaapplugin.dmaapplugin.dr_lifecycle.create_feed + delete: + implementation: + dmaapplugin.dmaapplugin.dr_lifecycle.delete_feed + + # Existing Data Router feed to be used as target for publishing/subscribing + ccsdk.nodes.ExistingFeed: + derived_from: cloudify.nodes.Root + + properties: + feed_id: + type: string + required: true + + interfaces: + cloudify.interfaces.lifecycle: + configure: + implementation: + dmaapplugin.dmaapplugin.dr_lifecycle.get_existing_feed + + # Existing Global Data Router feed (created via Invenio) to be used as target for bridging + ccsdk.nodes.ExternalTargetFeed: + derived_from: cloudify.nodes.Root + + properties: + url: + type: string + required: true + username: + type: string + required: true + userpw: + type: string + required: true + + # Global Data Router feed to be used as a source for bridging + # Has no properties + ccsdk.nodes.ExternalSourceFeed: + derived_from: cloudify.nodes.Root + + # Message Router topic to be created + ccsdk.nodes.Topic: + derived_from: cloudify.nodes.Root + + properties: + topic_name: + type: string + required: false + topic_description: + type: string + required: false + txenable: + type: boolean + required: false + replication_case: + type: string + required: false + global_mr_url: + type: string + required: false + + interfaces: + cloudify.interfaces.lifecycle: + create: + implementation: + dmaapplugin.dmaapplugin.mr_lifecycle.create_topic + delete: + implementation: + dmaapplugin.dmaapplugin.mr_lifecycle.delete_topic + + # Existing Message Router topic to be used as target for publishing/subscribing + ccsdk.nodes.ExistingTopic: + derived_from: cloudify.nodes.Root + + properties: + fqtn: + type: string + required: true + + interfaces: + cloudify.interfaces.lifecycle: + configure: + implementation: + dmaapplugin.dmaapplugin.mr_lifecycle.get_existing_topic + +relationships: + + ccsdk.relationships.publish_files: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.dr_relationships.add_dr_publisher + unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_publisher + + ccsdk.relationships.subscribe_to_files: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + establish: dmaapplugin.dmaapplugin.dr_relationships.add_dr_subscriber + unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_subscriber + + ccsdk.relationships.publish_events: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_publisher + unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client + + ccsdk.relationships.subscribe_to_events: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_subscriber + unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client + + ccsdk.relationships.bridges_to: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_dr_bridge + unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge + + ccsdk.relationships.bridges_to_external: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_dr_bridge + unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge + + ccsdk.relationships.bridges_from_external_to_internal: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_source_dr_bridge + unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge + diff --git a/dmaap/dmaapcontrollerif/__init__.py b/dmaap/dmaapcontrollerif/__init__.py new file mode 100644 index 0000000..611169f --- /dev/null +++ b/dmaap/dmaapcontrollerif/__init__.py @@ -0,0 +1 @@ +# DMaaP Bus Controller interface library
\ No newline at end of file diff --git a/dmaap/dmaapcontrollerif/dmaap_requests.py b/dmaap/dmaapcontrollerif/dmaap_requests.py new file mode 100644 index 0000000..eb6fe1b --- /dev/null +++ b/dmaap/dmaapcontrollerif/dmaap_requests.py @@ -0,0 +1,256 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +import requests + +### "Constants" +FEEDS_PATH = '/feeds' +PUBS_PATH = '/dr_pubs' +SUBS_PATH = '/dr_subs' +TOPICS_PATH = '/topics' +CLIENTS_PATH = '/mr_clients' +LOCATIONS_PATH = '/dcaeLocations' + +class DMaaPControllerHandle(object): + ''' + A simple wrapper class to map DMaaP bus controller API calls into operations supported by the requests module + ''' + + def __init__(self, api_url, user, password, logger, + feeds_path = FEEDS_PATH, + pubs_path = PUBS_PATH, + subs_path = SUBS_PATH, + topics_path = TOPICS_PATH, + clients_path = CLIENTS_PATH): + ''' + Constructor + ''' + self.api_url = api_url # URL for the root of the Controller resource tree, no trailing "/" + self.auth = (user, password) # user name and password for HTTP basic auth + self.logger = logger + self.feeds_path = feeds_path + self.pubs_path = pubs_path + self.subs_path = subs_path + self.topics_path = topics_path + self.clients_path = clients_path + + + ### INTERNAL FUNCTIONS ### + + def _make_url(self, path): + ''' + Make a full URL given the path relative to the root + ''' + if not path.startswith('/'): + path = '/' + path + + return self.api_url + path + + def _get_resource(self, path): + ''' + Get the DMaaP resource at path, where path is relative to the root. + ''' + url = self._make_url(path) + self.logger.info("Querying URL: {0}".format(url)) + return requests.get(url, auth=self.auth) + + def _create_resource(self, path, resource_content): + ''' + Create a DMaaP resource by POSTing to the resource collection + identified by path (relative to root), using resource_content as the body of the post + ''' + url = self._make_url(path) + self.logger.info("Posting to URL: {0} with body: {1}".format(url, resource_content)) + return requests.post(url, auth=self.auth, json=resource_content) + + def _delete_resource(self, path): + ''' + Delete the DMaaP resource at path, where path is relative to the root. + ''' + url = self._make_url(path) + self.logger.info("Deleting URL: {0}".format(url)) + return requests.delete(url, auth=self.auth) + + ### PUBLIC API ### + + # Data Router Feeds + def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None): + ''' + Create a DMaaP data router feed with the given feed name + and (optionally) feed version, feed description, ASPR classification, + and owner + ''' + feed_definition = {'feedName' : name} + if version: + feed_definition['feedVersion'] = version + if description: + feed_definition['feedDescription'] = description + if aspr_class: + feed_definition['asprClassification'] = aspr_class + if owner: + feed_definition['owner'] = owner + + return self._create_resource(self.feeds_path, feed_definition) + + def get_feed_info(self, feed_id): + ''' + Get the representation of the DMaaP data router feed whose feed id is feed_id. + ''' + return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id)) + + def delete_feed(self, feed_id): + ''' + Delete the DMaaP data router feed whose feed id is feed_id. + ''' + return self._delete_resource("{0}/{1}".format(self.feeds_path, feed_id)) + + # Data Router Publishers + def add_publisher(self, feed_id, location, username, password, status=None): + ''' + Add a publisher to feed feed_id at location location with user, pass, and status + ''' + publisher_definition = { + 'feedId' : feed_id, + 'dcaeLocationName' : location, + 'username' : username, + 'userpwd' : password + } + + if status: + publisher_definition['status'] = status + + return self._create_resource(self.pubs_path, publisher_definition) + + def get_publisher_info(self, pub_id): + ''' + Get the representation of the DMaaP data router publisher whose publisher id is pub_id + ''' + return self._get_resource("{0}/{1}".format(self.pubs_path, pub_id)) + + def delete_publisher(self, pub_id): + ''' + Delete the DMaaP data router publisher whose publisher id is id. + ''' + return self._delete_resource("{0}/{1}".format(self.pubs_path, pub_id)) + + + # Data Router SUbscrihers + def add_subscriber(self, feed_id, location, delivery_url, username, password, status=None): + ''' + Add a publisher to feed feed_id at location location with user, pass, and status + ''' + subscriber_definition = { + 'feedId' : feed_id, + 'dcaeLocationName' : location, + 'deliveryURL' : delivery_url, + 'username' : username, + 'userpwd' : password + } + + if status: + subscriber_definition['status'] = status + + return self._create_resource(self.subs_path, subscriber_definition) + + def get_subscriber_info(self, sub_id): + ''' + Get the representation of the DMaaP data router subscriber whose subscriber id is sub_id + ''' + return self._get_resource("{0}/{1}".format(self.subs_path, sub_id)) + + def delete_subscriber(self, sub_id): + ''' + Delete the DMaaP data router subscriber whose subscriber id is sub_id. + ''' + return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id)) + + # Message router topics + def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None): + ''' + Create a message router topic with the topic name 'name' and optionally the topic_description + 'description', the 'txenable' flag and the topic owner 'owner'. + ''' + topic_definition = {'topicName' : name}; + if description: + topic_definition['topicDescription'] = description + if owner: + topic_definition['owner'] = owner + if txenable != None: # It's a boolean! + topic_definition['txenable'] = txenable + if replication_case: + topic_definition['replicationCase'] = replication_case + if global_mr_url: + topic_definition['globalMrURL'] = global_mr_url + + return self._create_resource(self.topics_path, topic_definition) + + def get_topic_info(self, fqtn): + ''' + Get information about the topic whose fully-qualified name is 'fqtn' + ''' + return self._get_resource("{0}/{1}".format(self.topics_path, fqtn)) + + def delete_topic(self, fqtn): + ''' + Delete the topic whose fully qualified name is 'fqtn' + ''' + return self._delete_resource("{0}/{1}".format(self.topics_path, fqtn)) + + # Message route clients (publishers and subscribers + def create_client(self, fqtn, location, client_role, actions): + ''' + Creates a client authorized to access the topic with fully-qualified name 'fqtn', + from the location 'location', using the AAF client role 'client_role'. The + client is authorized to perform actions in the list 'actions'. (Valid + values are 'pub', 'sub', and 'view' + ''' + client_definition = { + 'fqtn' : fqtn, + 'dcaeLocationName' : location, + 'clientRole' : client_role, + 'action' : actions + } + return self._create_resource(self.clients_path, client_definition) + + def get_client_info(self, client_id): + ''' + Get client information for the client whose client ID is 'client_id' + ''' + return self._get_resource("{0}/{1}".format(self.clients_path, client_id)) + + def delete_client(self, client_id): + ''' + Delete the client whose client ID is 'client_id' + ''' + return self._delete_resource("{0}/{1}".format(self.clients_path, client_id)) + + def get_dcae_locations(self, dcae_layer): + ''' + Get the list of location names known to the DMaaP bus controller + whose "dcaeLayer" property matches dcae_layer and whose status is "VALID". + "dcaeLayer" is "opendcae-central" for central sites. + ''' + # Do these as a separate step so things like 404 get reported precisely + locations = self._get_resource(LOCATIONS_PATH) + locations.raise_for_status() + + # pull out location names for VALID locations with matching dcae_layer + return map(lambda l: l["dcaeLocationName"], + filter(lambda i : (i['dcaeLayer'] == dcae_layer and i['status'] == 'VALID'), + locations.json())) + diff --git a/dmaap/dmaapplugin/__init__.py b/dmaap/dmaapplugin/__init__.py new file mode 100644 index 0000000..130c0bf --- /dev/null +++ b/dmaap/dmaapplugin/__init__.py @@ -0,0 +1,46 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +## Get parameters for accessing the DMaaP controller +from consulif.consulif import ConsulHandle +from cloudify.exceptions import NonRecoverableError + +CONSUL_HOST = "127.0.0.1" # Should always be a local consul agent on Cloudify Manager +CM_SERVICE_NAME = "cloudify_manager" # Name under which CM is registered, used as key to get config +DBC_SERVICE_NAME= "dmaap_bus_controller" # Name under which the DMaaP bus controller is registered + +try: + _ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, None) + config = _ch.get_config(CM_SERVICE_NAME) + DMAAP_USER = config['dmaap']['username'] + DMAAP_PASS = config['dmaap']['password'] + DMAAP_OWNER = config['dmaap']['owner'] + if 'protocol' in config['dmaap']: + DMAAP_PROTOCOL = config['dmaap']['protocol'] + else: + DMAAP_PROTOCOL = 'https' # Default to https (service discovery should give us this but doesn't + if 'path' in config['dmaap']: + DMAAP_PATH = config['dmaap']['path'] + else: + DMAAP_PATH = 'webapi' # SHould come from service discovery but Consul doesn't support it + + service_address, service_port = _ch.get_service(DBC_SERVICE_NAME) + DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH) + +except Exception as e: + raise NonRecoverableError("Error configuring dmaap plugin: {0}".format(e)) diff --git a/dmaap/dmaapplugin/dmaaputils.py b/dmaap/dmaapplugin/dmaaputils.py new file mode 100644 index 0000000..e043a07 --- /dev/null +++ b/dmaap/dmaapplugin/dmaaputils.py @@ -0,0 +1,28 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +# Utility functions + +import string +import random + +def random_string(n): + ''' + Create a random alphanumeric string, n characters long. + ''' + return ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for x in range(n)) diff --git a/dmaap/dmaapplugin/dr_bridge.py b/dmaap/dmaapplugin/dr_bridge.py new file mode 100644 index 0000000..4e0df4d --- /dev/null +++ b/dmaap/dmaapplugin/dr_bridge.py @@ -0,0 +1,198 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS +from dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +# Set up a subscriber to a source feed +def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw): + # Add subscriber to source feed + add_sub = dmc.add_subscriber(source_feed_id, loc, delivery_url, username, userpw) + add_sub.raise_for_status() + return add_sub.json() + +# Set up a publisher to a target feed +def _set_up_publisher(dmc, target_feed_id, loc): + username = random_string(8) + userpw = random_string(10) + add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw) + add_pub.raise_for_status() + pub_info = add_pub.json() + return pub_info["pubId"], username, userpw + +# Get a central location to use when creating a publisher or subscriber +def _get_central_location(dmc): + locations = dmc.get_dcae_locations('opendcae-central') + if len(locations) < 1: + raise Exception('No central location found for setting up DR bridging') + return locations[0] # We take the first one. Typically there will be two central locations + + +# Set up a "bridge" between two feeds internal to DCAE +# A source feed "bridges_to" a target feed, meaning that anything published to +# the source feed will be delivered to subscribers to the target feed (as well as +# to subscribers of the source feed). +# +# The bridge is established by first adding a publisher to the target feed. The result of doing this +# is a publish URL and a set of publication credentials. +#The publish URL and publication credentials are used to set up a subscriber to the source feed. +#I.e., we tell the source feed to deliver to an endpoint which is actually a publish +# endpoint for the target feed. +@operation +def create_dr_bridge(**kwargs): + + try: + + # Get source and target feed ids + if 'feed_id' in ctx.target.instance.runtime_properties: + target_feed_id = ctx.target.instance.runtime_properties['feed_id'] + else: + raise Exception('Target feed has no feed_id property') + if 'feed_id' in ctx.source.instance.runtime_properties: + source_feed_id = ctx.source.instance.runtime_properties['feed_id'] + else: + raise Exception('Source feed has no feed_id property') + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + # Get a location to use when creating a publisher or subscriber--a central location seems reasonable + loc = _get_central_location(dmc) + + ctx.logger.info('Creating bridge from feed {0} to feed {1} using location {2}'.format(source_feed_id, target_feed_id, loc)) + + # Add publisher to target feed + publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc) + ctx.logger.info("Added publisher id {0} to target feed {1} with user {2}".format(publisher_id, target_feed_id, username)) + + # Add subscriber to source feed + delivery_url = ctx.target.instance.runtime_properties['publish_url'] + subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw) + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, delivery_url)) + + # Save the publisher and subscriber IDs on the source node, indexed by the target node id + ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "subscriber_id": subscriber_id} + + except Exception as e: + ctx.logger.error("Error creating bridge: {0}".format(e)) + raise NonRecoverableError(e) + +# Set up a bridge from an internal DCAE feed to a feed in an external Data Router system +# The target feed needs to be provisioned in the external Data Router system. A publisher +# to that feed must also be set up in the external Data Router system. The publish URL, +# username, and password need to be captured in a target node of type dcae.nodes.ExternalTargetFeed. +# The bridge is established by setting up a subscriber to the internal DCAE source feed using the +# external feed publisher parameters as delivery parameters for the subscriber. +@operation +def create_external_dr_bridge(**kwargs): + try: + + # Make sure target feed has full set of properties + if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties: + url = ctx.target.node.properties['url'] + username = ctx.target.node.properties['username'] + userpw = ctx.target.node.properties['userpw'] + else: + raise Exception ("Target feed missing url, username, and/or user pw") + + # Make sure source feed has a feed ID + if 'feed_id' in ctx.source.instance.runtime_properties: + source_feed_id = ctx.source.instance.runtime_properties['feed_id'] + else: + raise Exception('Source feed has no feed_id property') + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + # Get a central location to use when creating subscriber + loc = _get_central_location(dmc) + + ctx.logger.info('Creating external bridge from feed {0} to external url {1} using location {2}'.format(source_feed_id, url, loc)) + + # Create subscription to source feed using properties of the external target feed + subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, url, username, userpw) + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, url)) + + # Save the subscriber ID on the source node, indexed by the target node id + ctx.source.instance.runtime_properties[ctx.target.node.id] = {"subscriber_id": subscriber_id} + + except Exception as e: + ctx.logger.error("Error creating external bridge: {0}".format(e)) + raise NonRecoverableError(e) + +# Set up a bridge from a feed in an external Data Router system to an internal DCAE feed. +# The bridge is established by creating a publisher on the internal DCAE feed. Then a subscription +# to the external feed is created through manual provisioning in the external Data Router system, using +# the publish URL and the publisher username and password for the internal feed as the delivery parameters +# for the external subscription. +# In order to obtain the publish URL, publisher username, and password, a blueprint using this sort of +# bridge will typically have an output that exposes the runtime_property set on the source node in this operation. +@operation +def create_external_source_dr_bridge(**kwargs): + try: + # Get target feed id + if 'feed_id' in ctx.target.instance.runtime_properties: + target_feed_id = ctx.target.instance.runtime_properties['feed_id'] + else: + raise Exception('Target feed has no feed_id property') + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + # Get a central location to use when creating a publisher + loc = _get_central_location(dmc) + + # Create a publisher on the target feed + publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc) + + # Save the publisher info on the source node, indexed by the target node + ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "url": ctx.target.instance.runtime_properties["publish_url"], "username": username, "userpw": userpw} + + except Exception as e: + ctx.logger.error("Error creating external source bridge: {0}".format(e)) + +# Remove the bridge between the relationship source and target. +# For a bridge between 2 internal feeds, deletes the subscriber on the source feed and the publisher on the target feed. +# For a bridge to an external target feed, deletes the subscriber on the source feed. +# For a bridge from an external source feed, deletes the publisher on the target feed. +@operation +def remove_dr_bridge(**kwargs): + try: + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + if ctx.target.node.id in ctx.source.instance.runtime_properties: + + if 'subscriber_id' in ctx.source.instance.runtime_properties[ctx.target.node.id]: + # Delete the subscription for this bridge + ctx.logger.info("Removing bridge -- deleting subscriber {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id'])) + dmc.delete_subscriber(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id']) + + if 'publisher_id' in ctx.source.instance.runtime_properties: + # Delete the publisher for this bridge + ctx.logger.info("Removing bridge -- deleting publisher {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id'])) + dmc.delete_publisher(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id']) + + ctx.logger.info("Remove bridge from {0} to {1}".format(ctx.source.node.id, ctx.target.node.id)) + + except Exception as e: + ctx.logger.error("Error removing bridge: {0}".format(e)) + # Let the uninstall workflow proceed--don't throw a NonRecoverableError diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py new file mode 100644 index 0000000..45f8674 --- /dev/null +++ b/dmaap/dmaapplugin/dr_lifecycle.py @@ -0,0 +1,121 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER +from dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +# Lifecycle operations for DMaaP Data Router feeds + +@operation +def create_feed(**kwargs): + ''' + Create a new data router feed + Expects "feed_name" to be set in node properties + Allows "feed_version", "feed_description", and "aspr_classification" as optional properties + (Sets default values if not provided ) + Sets instance runtime properties: + - "feed_id" + - "publish_url" + - "log_url" + + ''' + try: + # Make sure there's a feed_name + if "feed_name" in ctx.node.properties.keys(): + feed_name = ctx.node.properties["feed_name"] + else: + feed_name = random_string(12) + + # Set defaults/placeholders for the optional properties for the feed + if "feed_version" in ctx.node.properties.keys(): + feed_version = ctx.node.properties["feed_version"] + else: + feed_version = "0.0" + if "feed_description" in ctx.node.properties.keys(): + feed_description = ctx.node.properties["feed_description"] + else: + feed_description = "No description provided" + if "aspr_classification" in ctx.node.properties.keys(): + aspr_classification = ctx.node.properties["aspr_classification"] + else: + aspr_classification = "unclassified" + + # Make the request to the controller + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to create feed name {0}".format(feed_name)) + f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER) + f.raise_for_status() + + # Capture important properties from the result + feed = f.json() + ctx.instance.runtime_properties["feed_id"] = feed["feedId"] + ctx.instance.runtime_properties["publish_url"] = feed["publishURL"] + ctx.instance.runtime_properties["log_url"] = feed["logURL"] + ctx.logger.info("Created feed name {0} with feed id {1}".format(feed_name, feed["feedId"])) + + except Exception as e: + ctx.logger.error("Error creating feed: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def get_existing_feed(**kwargs): + ''' + Find information for an existing data router feed + Expects "feed_id" to be set in node properties -- uniquely identifies the feed + Sets instance runtime properties: + - "feed_id" + - "publish_url" + - "log_url" + ''' + try: + # Make the lookup request to the controller + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + f = dmc.get_feed_info(ctx.node.properties["feed_id"]) + f.raise_for_status() + + # Capture important properties from the result + feed = f.json() + ctx.instance.runtime_properties["feed_id"] = ctx.node.properties["feed_id"] # Just to be consistent with newly-created node, above + ctx.instance.runtime_properties["publish_url"] = feed["publishURL"] + ctx.instance.runtime_properties["log_url"] = feed["logURL"] + ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"])) + + except Exception as e: + ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e)) + raise NonRecoverableError(e) + +@operation +def delete_feed(**kwargs): + ''' + Delete a feed + Expects "feed_id" to be set on the instance's runtime properties + ''' + try: + # Make the lookup request to the controllerid=ctx.node.properties["feed_id"] + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + f = dmc.delete_feed(ctx.instance.runtime_properties["feed_id"]) + f.raise_for_status() + ctx.logger.info("Deleting feed id {0}".format(ctx.instance.runtime_properties["feed_id"])) + + except Exception as e: + ctx.logger.error("Error deleting feed id {id}: {er}".format(id=ctx.instance.runtime_properties["feed_id"],er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py new file mode 100644 index 0000000..8796354 --- /dev/null +++ b/dmaap/dmaapplugin/dr_relationships.py @@ -0,0 +1,211 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST +from dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle +from consulif.consulif import ConsulHandle + +# Lifecycle operations for DMaaP Data Router +# publish and subscribe relationships + +@operation +def add_dr_publisher(**kwargs): + ''' + Sets up the source of the publishes_relationship as a publisher to the feed that + is the target of the relationship + Assumes target (the feed) has the following runtime properties set + - feed_id + - log_url + - publish_url + Assumes source (the publisher) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing one property: + - location (the dcaeLocationName to pass when adding the publisher to the feed) + Generates a user name and password that the publisher will need to use when publishing + Adds the following properties to the dictionary above: + - publish_url + - log_url + - username + - password + ''' + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + target_feed = ctx.target.node.id + ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed)) + + # Set up the parameters for the add_publisher request to the DMaaP bus controller + feed_id = ctx.target.instance.runtime_properties["feed_id"] + location = ctx.source.instance.runtime_properties[target_feed]["location"] + username = random_string(8) + password = random_string(10) + + # Make the request to add the publisher to the feed + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + add_pub = dmc.add_publisher(feed_id, location, username, password) + add_pub.raise_for_status() + publisher_info = add_pub.json() + publisher_id = publisher_info["pubId"] + ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password)) + + # Set runtime properties on the source + ctx.source.instance.runtime_properties[target_feed] = { + "publisher_id" : publisher_id, + "location" : location, + "publish_url" : ctx.target.instance.runtime_properties["publish_url"], + "log_url" : ctx.target.instance.runtime_properties["log_url"], + "username" : username, + "password" : password + } + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed]) + + except Exception as e: + ctx.logger.error("Error adding publisher to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + + +@operation +def delete_dr_publisher(**kwargs): + ''' + Deletes publisher (the source of the publishes_files relationship) + from the feed (the target of the relationship). + Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties, + when the publisher was added to the feed. + ''' + + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + # Get the publisher id + target_feed = ctx.target.node.id + publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"] + ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id, target_feed)) + + # Make the request + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + del_result = dmc.delete_publisher(publisher_id) + del_result.raise_for_status() + + ctx.logger.info("Deleted publisher {0}".format(publisher_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting publisher: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue + + +@operation +def add_dr_subscriber(**kwargs): + ''' + Sets up the source of the subscribes_to_files relationship as a subscriber to the + feed that is the target of the relationship. + Assumes target (the feed) has the following runtime property set + - feed_id + Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing the following properties: + - location (the dcaeLocationName to pass when adding the publisher to the feed) + - delivery_url (the URL to which data router will deliver files) + - username (the username data router will use when delivering files) + - password (the password data router will use when delivering files) + Adds a property to the dictionary above: + - subscriber_id (used to delete the subscriber in the uninstall workflow + ''' + try: + target_feed = ctx.target.node.id + ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed)) + + # Get the parameters for the call + feed_id = ctx.target.instance.runtime_properties["feed_id"] + location = ctx.source.instance.runtime_properties[target_feed]["location"] + delivery_url = ctx.source.instance.runtime_properties[target_feed]["delivery_url"] + username = ctx.source.instance.runtime_properties[target_feed]["username"] + password = ctx.source.instance.runtime_properties[target_feed]["password"] + + # Make the request to add the subscriber to the feed + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password) + add_sub.raise_for_status() + subscriber_info = add_sub.json() + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location)) + + # Add subscriber_id to the runtime properties + # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id + ctx.source.instance.runtime_properties[target_feed] = { + "subscriber_id": subscriber_id, + "location" : location, + "delivery_url" : delivery_url, + "username" : username, + "password" : password + } + ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed])) + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed]) + + except Exception as e: + ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + + +@operation +def delete_dr_subscriber(**kwargs): + ''' + Deletes subscriber (the source of the subscribes_to_files relationship) + from the feed (the target of the relationship). + Assumes that the source node's runtime properties dictionary for the target feed + includes 'subscriber_id', set when the publisher was added to the feed. + ''' + try: + # Get the subscriber id + target_feed = ctx.target.node.id + subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] + ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed)) + + # Make the request + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + del_result = dmc.delete_subscriber(subscriber_id) + del_result.raise_for_status() + + ctx.logger.info("Deleted subscriber {0}".format(subscriber_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting subscriber: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py new file mode 100644 index 0000000..16ad953 --- /dev/null +++ b/dmaap/dmaapplugin/mr_lifecycle.py @@ -0,0 +1,121 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER +from dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +# Lifecycle operations for DMaaP Message Router topics +@operation +def create_topic(**kwargs): + ''' + Creates a message router topic. + Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', + and 'global_mr_url' as optional node properties. If 'topic_name' is not set, + generates a random one. + Sets 'fqtn' in the instance runtime_properties. + Note that 'txenable' is a Message Router flag indicating whether transactions + are enabled on the topic. + ''' + try: + # Make sure there's a topic_name + if "topic_name" in ctx.node.properties: + topic_name = ctx.node.properties["topic_name"] + else: + topic_name = random_string(12) + + # Make sure there's a topic description + if "topic_description" in ctx.node.properties: + topic_description = ctx.node.properties["topic_description"] + else: + topic_description = "No description provided" + + # ..and the truly optional setting + if "txenable" in ctx.node.properties: + txenable = ctx.node.properties["txenable"] + else: + txenable= False + + if "replication_case" in ctx.node.properties: + replication_case = ctx.node.properties["replication_case"] + else: + replication_case = None + + if "global_mr_url" in ctx.node.properties: + global_mr_url = ctx.node.properties["global_mr_url"] + else: + global_mr_url = None + + + # Make the request to the controller + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to create topic name {0}".format(topic_name)) + t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url) + t.raise_for_status() + + # Capture important properties from the result + topic = t.json() + ctx.instance.runtime_properties["fqtn"] = topic["fqtn"] + + except Exception as e: + ctx.logger.error("Error creating topic: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def get_existing_topic(**kwargs): + ''' + Get data for an existing feed. + Expects 'fqtn' as a node property. + Copies this property to 'fqtn' in runtime properties for consistency + with a newly-created topic. + While there's no real need to make a call to the DMaaP bus controller, + we do so just to make sure the fqtn is known to the controller, so we + don't run into problems when we try to add a publisher or subscriber later. + ''' + try: + fqtn = ctx.node.properties["fqtn"] + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn)) + t = dmc.get_topic_info(fqtn) + t.raise_for_status() + + ctx.instance.runtime_properties["fqtn"] = ctx.node.properties["fqtn"] + + except Exception as e: + ctx.logger.error("Error getting existing topic: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def delete_topic(**kwargs): + ''' + Delete the topic. Expects the instance runtime property "fqtn" to have been + set when the topic was created. + ''' + try: + fqtn = ctx.instance.runtime_properties["fqtn"] + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to delete topic {0}".format(fqtn)) + t = dmc.delete_topic(fqtn) + t.raise_for_status() + + except Exception as e: + ctx.logger.error("Error getting existing topic: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue diff --git a/dmaap/dmaapplugin/mr_relationships.py b/dmaap/dmaapplugin/mr_relationships.py new file mode 100644 index 0000000..ff92d67 --- /dev/null +++ b/dmaap/dmaapplugin/mr_relationships.py @@ -0,0 +1,119 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle +from consulif.consulif import ConsulHandle + +# Message router relationship operations + +def _add_mr_client(ctype, actions): + ''' + Adds the node represented by 'source' as a client (publisher or subscriber) to + to topic represented by the 'target' node. The list of actions in 'actions' + determines whether the client is a subscriber or a publisher. + + Assumes target (the topic) has the following runtime property set + - fqtn + Assumes source (the client) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing the following properties: + - location (the dcaeLocationName to pass when adding the client to the topic) + - client_role (the AAF client role under which the client will access the topic) + Adds two properties to the dictionary above: + - topic_url (the URL that the client can use to access the topic) + - client_id (used to delete the client in the uninstall workflow) + ''' + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + target_topic = ctx.target.node.id # Key for the source's dictionary with topic-related info + fqtn = ctx.target.instance.runtime_properties["fqtn"] + ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn)) + + # Get the parameters needed for adding the client + location = ctx.source.instance.runtime_properties[target_topic]["location"] + client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"] + + # Make the request to add the client to the topic + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + c = dmc.create_client(fqtn, location, client_role, actions) + c.raise_for_status() + client_info = c.json() + client_id = client_info["mrClientId"] + topic_url = client_info["topicURL"] + + # Update source's runtime properties + #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url + #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id + ctx.source.instance.runtime_properties[target_topic] = { + "topic_url" : topic_url, + "client_id" : client_id, + "location" : location, + "client_role" : client_role + } + + ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location)) + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic]) + + except Exception as e: + ctx.logger.error("Error adding client to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def add_mr_publisher(**kwargs): + _add_mr_client("publisher", ["view", "pub"]) + +@operation +def add_mr_subscriber(**kwargs): + _add_mr_client("subscriber", ["view", "sub"]) + +@operation +def delete_mr_client(**kwargs): + ''' + Delete the client (publisher or subscriber). + Expect property 'client_id' to have been set in the instance's runtime_properties + when the client was created. + ''' + try: + target_topic = ctx.target.node.id + client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"] + ctx.logger.info("Attempting to delete client {0} ".format(client_id)) + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + c = dmc.delete_client(client_id) + c.raise_for_status() + + ctx.logger.info("Deleted client {0}".format(client_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting MR client: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue + diff --git a/dmaap/requirements.txt b/dmaap/requirements.txt new file mode 100644 index 0000000..ffdb97f --- /dev/null +++ b/dmaap/requirements.txt @@ -0,0 +1 @@ +python-consul==0.7.0 diff --git a/dmaap/setup.py b/dmaap/setup.py new file mode 100644 index 0000000..0d23668 --- /dev/null +++ b/dmaap/setup.py @@ -0,0 +1,16 @@ +from setuptools import setup, find_packages + +setup( + name = "cloudifydmaapplugin", + version = "1.2.0", + packages=find_packages(), + author = "AT&T", + description = ("Cloudify plugin for creating DMaaP feeds and topics, and setting up publishers and subscribers."), + license = "", + keywords = "", + url = "", + zip_safe=False, + install_requires = [ + "python-consul==0.7.0" + ] +) diff --git a/dmaap/tests/test_plugin.py b/dmaap/tests/test_plugin.py new file mode 100644 index 0000000..b9ebedc --- /dev/null +++ b/dmaap/tests/test_plugin.py @@ -0,0 +1,26 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +import pytest +import requests +from cloudify.mocks import MockCloudifyContext +from cloudify.state import current_ctx +from cloudify.exceptions import NonRecoverableError + +def test_noop(): + pass diff --git a/dmaap/tox.ini b/dmaap/tox.ini new file mode 100644 index 0000000..9498c82 --- /dev/null +++ b/dmaap/tox.ini @@ -0,0 +1,26 @@ +# ============LICENSE_START==================================================== +# org.onap.ccsdk +# ============================================================================= +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +[tox] +envlist = py27 +[testenv] +deps= + pytest + cloudify==3.4 + requests +commands=pytest |