From 7e1efe3174336fa09a56c596af55ba93d7b14a91 Mon Sep 17 00:00:00 2001 From: "Hansen, Tony (th1395)" Date: Wed, 13 May 2020 18:55:54 +0000 Subject: move plugins from from ccsdk to dcaegen2 copy dmaap, helm, pgaas and sshkeyshare plugins from ccsdk to dcaegen2 Change-Id: Ib257495de6c275c45f0c87a4b42ac21a2fab7979 Signed-off-by: Hansen, Tony (th1395) Issue-ID: DCAEGEN2-2207 Signed-off-by: Hansen, Tony (th1395) --- dmaap/.gitignore | 4 + dmaap/LICENSE.txt | 17 ++ dmaap/README.md | 324 +++++++++++++++++++++++++++++ dmaap/consulif/__init__.py | 0 dmaap/consulif/consulif.py | 125 ++++++++++++ dmaap/dmaap.yaml | 202 ++++++++++++++++++ dmaap/dmaapcontrollerif/__init__.py | 1 + dmaap/dmaapcontrollerif/dmaap_requests.py | 310 ++++++++++++++++++++++++++++ dmaap/dmaapplugin/__init__.py | 82 ++++++++ dmaap/dmaapplugin/dmaaputils.py | 29 +++ dmaap/dmaapplugin/dr_bridge.py | 199 ++++++++++++++++++ dmaap/dmaapplugin/dr_lifecycle.py | 153 ++++++++++++++ dmaap/dmaapplugin/dr_relationships.py | 219 ++++++++++++++++++++ dmaap/dmaapplugin/mr_lifecycle.py | 143 +++++++++++++ dmaap/dmaapplugin/mr_relationships.py | 119 +++++++++++ dmaap/pom.xml | 327 ++++++++++++++++++++++++++++++ dmaap/requirements.txt | 3 + dmaap/setup.py | 36 ++++ dmaap/tests/conftest.py | 88 ++++++++ dmaap/tests/test_consulif.py | 72 +++++++ dmaap/tests/test_dmaapcontrollerif.py | 113 +++++++++++ dmaap/tests/test_dr_lifecycle.py | 65 ++++++ dmaap/tests/test_mr_lifecycle.py | 59 ++++++ dmaap/tests/test_plugin.py | 26 +++ dmaap/tests/test_utils.py | 26 +++ dmaap/tox.ini | 36 ++++ 26 files changed, 2778 insertions(+) create mode 100644 dmaap/.gitignore create mode 100644 dmaap/LICENSE.txt create mode 100644 dmaap/README.md create mode 100644 dmaap/consulif/__init__.py create mode 100644 dmaap/consulif/consulif.py create mode 100644 dmaap/dmaap.yaml create mode 100644 dmaap/dmaapcontrollerif/__init__.py create mode 100644 dmaap/dmaapcontrollerif/dmaap_requests.py create mode 100644 dmaap/dmaapplugin/__init__.py create mode 100644 dmaap/dmaapplugin/dmaaputils.py create mode 100644 dmaap/dmaapplugin/dr_bridge.py create mode 100644 dmaap/dmaapplugin/dr_lifecycle.py create mode 100644 dmaap/dmaapplugin/dr_relationships.py create mode 100644 dmaap/dmaapplugin/mr_lifecycle.py create mode 100644 dmaap/dmaapplugin/mr_relationships.py create mode 100644 dmaap/pom.xml create mode 100644 dmaap/requirements.txt create mode 100644 dmaap/setup.py create mode 100644 dmaap/tests/conftest.py create mode 100644 dmaap/tests/test_consulif.py create mode 100644 dmaap/tests/test_dmaapcontrollerif.py create mode 100644 dmaap/tests/test_dr_lifecycle.py create mode 100644 dmaap/tests/test_mr_lifecycle.py create mode 100644 dmaap/tests/test_plugin.py create mode 100644 dmaap/tests/test_utils.py create mode 100644 dmaap/tox.ini (limited to 'dmaap') diff --git a/dmaap/.gitignore b/dmaap/.gitignore new file mode 100644 index 0000000..9ef55c9 --- /dev/null +++ b/dmaap/.gitignore @@ -0,0 +1,4 @@ +# local additions to plugins .gitignore +wheels +cdap.zip +docker.zip diff --git a/dmaap/LICENSE.txt b/dmaap/LICENSE.txt new file mode 100644 index 0000000..86c0033 --- /dev/null +++ b/dmaap/LICENSE.txt @@ -0,0 +1,17 @@ +============LICENSE_START======================================================= +org.onap.dcaegen2 +================================================================================ +Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +================================================================================ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END========================================================= diff --git a/dmaap/README.md b/dmaap/README.md new file mode 100644 index 0000000..55ac621 --- /dev/null +++ b/dmaap/README.md @@ -0,0 +1,324 @@ +## Cloudify DMaaP Plugin +Cloudify plugin for creating and managing DMaaP Data Router feeds and subscriptions and +DMaaP Message Router topics. The plugin uses the DMaaP Bus Controller API. + +### Plugin Support for DMaaP Data Router +#### Plugin Types for DMaaP Data Router +The Cloudify type definitions for DMaaP Data Router nodes and relationships +are defined in [`dmaap.yaml`](./dmaap.yaml). + +There are four node types for DMaaP Data Router: + +- `dcaegen2.nodes.Feed`: This type represents a feed that does not yet +exist and that should be created when the install workflow is +run against a blueprint that contains a node of this type. + +Property|Type|Required?|Description | +--------|----|---------|--------------------------------------- +feed_name|string|no|a name that identifies the feed (plugin will generate if absent) +feed_version|string|no|version number for the feed (feed_name + feed_version uniquely identify the feed in DR) +feed_description|string|no|human-readable description of the feed +aspr_classification|string|no|AT&T ASPR classification of the feed + + +- `dcaegen2.nodes.ExistingFeed`: This type represents a feed that +already exists. Nodes of this type are placed in a blueprint so +that other nodes in the blueprint can be set up as publishers or +subscribers to the feed. The table below shows the properties that a node +of this type may have. + +Property|Type|Required?|Description +--------|----|---------|---------------------------------------- +feed_id|string|no|Feed identifier assigned by DMaaP when the feed was created +feed_name|string|no|a name that identifies the feed + +- `dcaegen2.nodes.ExternalTargetFeed`: This type represents a feed created in an external DMaaP +environment (i.e., an environment that the plugin cannot access to make provisioning requests, such as +a shared corporate system). Nodes of this type are placed in a blueprint so that other feed nodes of +type `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed` can be set up to "bridge" to external feeds by +publishing data to the external feeds. The table below shows the properties that a node of this type +may have. + +Property|Type|Required?|Description +--------|----|---------|---------------------------------------- +url|string|yes|The publish URL of the external feed. +username|string|yes|The username to be used when delivering to the external feed +userpw|string|yes|The password to be used when delivering to the external feed + +_Note: These properties are usually obtained by manually creating a feed in the external +DMaaP DR system and then creating a publisher for that feed._ + +- `dcaegen2.nodes.ExternalSourceFeed`: This type represents a feed created in an external DMaaP +environment (i.e., an environment that the plugin cannot access to makes provisioning requests, such as +a shared corporate system). Nodes of this type are place in a blueprint so that they can be set up to +"bridge" to other feed nodes of type `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed`. This type +has no node properties, but when a bridge is set up, the url, username, and password are attached to the +node as runtime_properties, using the name of the target feed node as the top-level key. + +There are five relationship types for DMaaP Data Router: + +- `dcaegen2.relationships.publish_files`, used to +indicate that the relationship's source node sends is a publisher to the +Data Router feed represented by the relationship's target node. +- `dcaegen2.relationships.subscribe_to_files`, used to +indicate that the relationship's source node is a subscriber to the +Data Router feed represented by the relationship's target node. +- `dcaegen2.relationships.bridges_to`, used to indicate that the relationship's source +node (a `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed`) should be set up +to forward data ("bridge") to the relationship's target feed (another `dcaegen2.nodes.Feed` or +`dcaegen2.nodes.ExistingFeed`). +- `dcaegen2.relationships.bridges_to_external`, used to indicate that the relationship's source +node (a `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed`) should be set up +to forward data ("bridge") to the relationship's target node (a feed in an external DMaaP system, +represented by a `dcaegen2.nodes.ExternalTargetFeed` node). +- `dcaegen2.relationships.bridges_from_external_to_internal`, used to indicate the the relationship's source +node (a feed in an external DMaaP system, represented by a `dcaegen2.nodes.ExternalSourceFeed` node) should be set up to forward date ("bridge") +to the relationship's target node (an internal ONAP feed, represented by a `dcaegen2.nodes.Feed` or `dcaegen2.nodes.ExistingFeed` node). + +The plugin code implements the lifecycle operations needed to create and +delete feeds and to add and remove publishers and subscribers. It also implements +the operations needed to set up bridging between feeds. + +#### Interaction with Other Plugins +When creating a new feed or processing a reference to an existing feed, +the plugin operates independently of other plugins. + +When processing a `dcaegen2.relationships.publish_files` relationship or a +`dcaegen2.relationships.subscribe_to_files` relationship, this plugin needs +to obtain data from the source node and, in the case of `publish_files`, provide +data to the source node. Certain conventions are therefore needed for +passing data between this plugin and the plugins responsible for the source +nodes in these relationships. In Cloudify, the mechanism for +sharing data among plugins is the `ctx.instance.runtime_properties` dictionary +associated with each node. + +A given source node may have relationships with several feeds. For example, an ONAP DCAE +data collector might publish two different types of data to two different feeds. An ONAP DCAE +analytics module might subscribe to one feed to get input for its processing and +publish its results to a different feed. When this DMaaP plugin and the plugin for the +source node exchange information, they need to do in a way that lets them distinguish +among different feeds. We do this through a simple convention: for each source node +to feed relationship, the source node plugin will create a property in the source node's +`runtime_properties` dictionary. The name of the property will be the same as the +name of the target node of the relationship. For instance, if a node has a +`publishes_files` relationship with a target node named `feed00`, then the plugin that's +responsible for managing the source node with create an entry in the source node's +`runtime_properties` dictionary named `feed00`. This entry itself will be a dictionary. + +The content of this data exchange dictionary depends on whether the source node is a +publisher (i.e., the relationship is `publish_files`) or a subscriber (i.e., the +relationship is `subscribe_to_files`). + +For the `publish_files` relationship, the data exchange dictionary has the following +properties: + +Property|Set by|Description +--------|------|------------------------------------------------ +location|source node plugin|the DMaaP location for the publisher, used to set up routing +publish_url|DMaaP plugin|the URL to which the publisher makes Data Router publish requests +log_url|DMaaP plugin|the URL from which log data for the feed can be obtained +username|DMaaP plugin|the username (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router +password|DMaaP plugin|the password (generated by the DMaaP plugin) the publisher uses to authenticate to Data Router + +For the `subscribe_to_files` relationship, the data exchange dictionary has the following +properties: + +Property|Set by|Description +--------|------|------------------------------------------------ +location|source node plugin|the DMaaP location for the subscriber, used to set up routing +delivery_url|source node plugin|the URL to which the Data Router should deliver files +username|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering files +password|source node plugin|the username Data Router uses to authenticate to the subscriber when delivering file + +### Plugin Support for DMaaP Message Router +#### Plugin Types for DMaaP Message Router +The Cloudify type definitions for DMaaP Message Router nodes and relationships +are defined in [`dmaap.yaml`](./dmaap.yaml). + +There are two node types for DMaaP Message Router: + +- `dcaegen2.nodes.Topic`: This type represents a topic that does not yet +exist and that should be created when the install workflow is +run against a blueprint that contains a node of this type. + +Property|Type|Required?|Description +--------|----|---------|--------------------------------------- +topic_name|string|no|a name that uniquely identifies the feed (plugin will generate if absent or is empty string or contain only whitespace) +topic_description|string|no|human-readable description of the feed +txenable|boolean|no|flag indicating whether transactions are enabled for this topic +replication_case|string|no|type of replication required for the topic (defaults to no replication) +global_mr_url|string|no|Global MR host name for replication to a global MR instance + +Note: In order to set up topics, a user should be familiar with message router and how it is configured, +and this README is not the place to explain the details of message router. Here are a couple of pieces of +information that might be helpful. +Currently, the allowed values for `replication_case` are: + +- `REPLICATION_NONE` +- `REPLICATION_EDGE_TO_CENTRAL` +- `REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL` +- `REPLICATION_CENTRAL_TO_EDGE` +- `REPLICATION_CENTRAL_TO_GLOBAL` +- `REPLICATION_GLOBAL_TO_CENTRAL` +- `REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE` + +The `global_mr_url` is actually a host name, not a full URL. It points to a host in a global message router +cluster. (A 'global' message router cluster is one that's not part of ONAP.) + +- `dcaegen2.nodes.ExistingTopic`: This type represents a topic that +already exists. Nodes of this type are placed in a blueprint so +that other nodes in the blueprint can be set up as publishers or +subscribers to the topic. The table below shows the properties that a node +of this type may have. + +Property|Type|Required?|Description +--------|----|---------|---------------------------------------- +fqtn|string|no|fully-qualified topic name for the topic +topic_name|string|no|a name that identifies the topic + +#### Interaction with Other Plugins +When creating a new topic or processing a reference to an existing topic, +the plugin operates independently of other plugins. + +When processing a `dcaegen2.relationships.publish_events` relationship or a +`dcaegen2.relationships.subscribe_to_events` relationship, this plugin needs +to obtain data from and provide data to the source node. Certain conventions are therefore needed for +passing data between this plugin and the plugins responsible for the source +nodes in these relationships. In Cloudify, the mechanism for +sharing data among plugins is the `ctx.instance.runtime_properties` dictionary +associated with each node. + +A given source node may have relationships with several topics. For example, an ONAP DCAE +analytics module might subscribe to one topic to get input for its processing and +publish its results to a different topic. When this DMaaP plugin and the plugin for the +source node exchange information, they need to do in a way that lets them distinguish +among different feeds. We do this through a simple convention: for each source node +to topic relationship, the source node plugin will create a property in the source node's +`runtime_properties` dictionary. The name of the property will be the same as the +name of the target node of the relationship. For instance, if a node has a +`publishes_events` relationship with a target node named `topic00`, then the plugin that's +responsible for managing the source node with create an entry in the source node's +`runtime_properties` dictionary named `topic00`. This entry itself will be a dictionary. + +For both types of relationship, the data exchange dictionary has the following +properties: + +Property|Set by|Description +--------|------|------------------------------------------------ +location|source node plugin|the DMaaP location for the publisher or subscriber, used to set up routing +client_role|source node plugin|the AAF client role that's requesting publish or subscribe access to the topic +topic_url|DMaaP plugin|the URL for accessing the topic to publish or receive events + +### Interaction with Consul configuration store +In addition to storing the results of DMaaP Data Router and DMaaP Message Router provisioning operations in `runtime_properties`, +the DMaaP plugin also stores these results into the ONAP configuration store, which resides in a +[Consul key-value store](https://www.consul.io/). This allows DMaaP clients (components that act as publishers, subscribers, or both) +to retrieve their DMaaP configuration information from Consul, rather than having the plugin that deploys the client directly +configure the client using data in `runtime_properties`. + +The `runtime_properties` for a client must contain a property called `service_component_name`. If this property is not present, +the plugin will raise a NonRecoverableError and cause the installation to fail. + +If `service_component_name` is present, then the plugin will use a Consul key consisting of the value +of `service_component_name` prepended to the fixed string `:dmaap`. For example, if the `service_component_name` +is `client123`, the plugin will use `client123:dmaap` as the key for storing DMaaP information into Consul. +Information for all of the feeds and topics for a client are stored under the same key. + +The value stored is a nested JSON object. At the top level of the object are properties representing each topic or feed +for which the component is a publisher or subscriber. The name of the property is the node name of the target feed or topic. +The value of the property is another JSON object that corresponds to the dictionary that the plugin created in +`runtime_properties` corresponding to the target feed or topic. Note that the information in Consul includes +all of the properties for the feed or topic, those set by the source node plugin as well as those set by the DMaaP plugin. + +Examples: + +Data Router publisher, target feed `feed00`: +``` +{ + "feed00": { + "username": "rC9QR51I", + "log_url": "https://dmaap.example.com/feedlog/972", + "publish_url": "https://dmaap.example.com/publish/972", + "location": "loc00", + "password": "QOQeUh5KLR", + "publisher_id": "972.360gm" + } +} +``` + +Data Router subscriber, target feed `feed01`: +``` +{ + "feed01": { + "username": "drdeliver", + "password": "1loveDataR0uter", + "location": "loc00", + "delivery_url": "https://example.com/whatever", + "subscriber_id": "1550" + } +} +``` + +Message Router publisher to `topic00`, subscriber to `topic01`. Note how each topic +appears as a top-level property in the object. +``` +{ + "topic00": { + "topic_url": "https://dmaap.example.com:3905/events/org.onap.dcaegen2.dmaap.FTL2.outboundx", + "client_role": "org.onap.dcaegen2.member", + "location": "loc00", + "client_id": "1494621774522" + }, + "topic01": { + "topic_url": "https://dmaap.example.com:3905/events/org.onap.dcaegen2.dmaap.FTL2.inboundx", + "client_role": "org.onap.dcaegen2.member", + "location": "loc00", + "client_id": "1494621778627" + } +} +``` + +### Packaging and installing +The DMaaP plugin is meant to be used as a [Cloudify managed plugin](http://docs.getcloudify.org/3.4.0/plugins/using-plugins/). Managed plugins +are packaged using [`wagon`](https://github.com/cloudify-cosmo/wagon). + +To package this plugin, executing the following command in the top-level directory of this plugin, from a Python environment in which `wagon` has been installed: +``` +wagon create -s . -r -o /path/to/directory/for/wagon/output +``` +Once the wagon file is built, it can be uploaded to a Cloudify Manager host using the `cfy plugins upload` command described in the documentation above. + +Managed plugins can also be loaded at the time a Cloudify Manager host is installed, via the installation blueprint and inputs file. We expect that this plugin will +be loaded at Cloudify Manager installation time, and that `cfy plugins upload` will be used only for delivering patches between releases. + +### Configuration +The plugin needs to be configured with certain parameters needed to access the DMaaP Bus Controller. In keeping with the ONAP architecture, this information is +stored in Consul. + +The plugin finds the address and port of the DMaaP Bus Controller using the Consul service discovery facility. The plugin expects the Bus Controller to be +registered under the name `dmaap_bus_controller`. + +Additional parameters come from the `dmaap` key in the Cloudify Manager's Consul configuration, which is stored in the Consul KV store under the key name +'cloudify_manager'. The table below lists the properties in the configuration: + +Property|Type|Required?|Default|Description +--------|----|---------|-------|-------------------------------- +`username`|string|Yes|(none)|The username for logging into DMaaP Bus Controller +`password`|string|Yes|(none)|The password for logging into DMaaP Bus Controller +`owner`|string|Yes|(none)|The name to be used as the owner for entities created by the plugin +`protocol`|string|No|`https`|The protocol (URL scheme) used to access the DMaaP bus controller (`http` or `https`) +`path`|string|No|`webapi`|The path to the root of the DMaaP Bus Controller API endpoint + +Here is an example of a Cloudify Manager configuration object showing only the `dmaap` key: +``` +{ + "dmaap": { + "username": "dmaap.client@dcaegen2orch.onap.org", + "password": "guessmeifyoucan" + "owner": "dcaegen2orc" + }, + + (other configuration here) + +} +``` diff --git a/dmaap/consulif/__init__.py b/dmaap/consulif/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dmaap/consulif/consulif.py b/dmaap/consulif/consulif.py new file mode 100644 index 0000000..a865df4 --- /dev/null +++ b/dmaap/consulif/consulif.py @@ -0,0 +1,125 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +import consul +import json +try: + from urllib.parse import urlparse +except ImportError: + from urlparse import urlparse + + +class ConsulHandle(object): + ''' + Provide access to Consul KV store and service discovery + ''' + + def __init__(self, api_url, user, password, logger): + ''' + Constructor + ''' + u = urlparse(api_url) + self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme) + + def get_config(self, key): + ''' + Get configuration information from Consul using the provided key. + It should be in JSON form. Convert it to a dictionary + ''' + (index, val) = self.ch.kv.get(key) + config = json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate + return config + + def get_service(self,service_name): + ''' + Look up the service named service_name in Consul. + Return the service address and port. + ''' + (index, val) = self.ch.catalog.service(service_name) + if len(val) > 0: # catalog.service returns an empty array if service not found + service = val[0] # Could be multiple listings, but we take the first + if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0): + service_address = service['ServiceAddress'] # Most services should have this + else: + service_address = service['Address'] # "External" services will have this only + service_port = service['ServicePort'] + else: + raise Exception('Could not find service information for "{0}"'.format(service_name)) + + return service_address, service_port + + def add_to_entry(self, key, add_name, add_value): + ''' + Find 'key' in consul. + Treat its value as a JSON string representing a dict. + Extend the dict by adding an entry with key 'add_name' and value 'add_value'. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under 'key'. + Watch out for conflicting concurrent updates. + + Example: + Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}' + add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'}) + should result in the value for key 'xyz:dmaap' in consul being updated to + '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}' + ''' + + while True: # do until update succeeds + (index, val) = self.ch.kv.get(key) # index gives version of key retrieved + + if val is None: # no key yet + vstring = '{}' + mod_index = 0 # Use 0 as the cas index for initial insertion of the key + else: + vstring = val['Value'] + mod_index = val['ModifyIndex'] + + # Build the updated dict + # Exceptions just propagate + v = json.loads(vstring) + v[add_name] = add_value + new_vstring = json.dumps(v) + + updated = self.ch.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false + if updated: + break + + + def delete_entry(self,entry_name): + ''' + Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store. + + Note that the kv.delete() operation always returns True, + whether there's an entry with key 'entry_name' exists or not. This doesn't seem like + a great design, but it means it's safe to try to delete the same entry repeatedly. + + Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and + feeds we've stored into the 'component_name:dmaap' entry. + + Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire + 'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or + subscribes relationship. The first unlink will actually remove the entry, the subsequent ones + will harmlessly try to remove it again. + + The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches + the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting + entry back into Consul. It would be very similar to add_from_entry. When there's nothing left + in the entry, then the entire entry would be deleted. + ''' + self.ch.kv.delete(entry_name) diff --git a/dmaap/dmaap.yaml b/dmaap/dmaap.yaml new file mode 100644 index 0000000..5b79f9b --- /dev/null +++ b/dmaap/dmaap.yaml @@ -0,0 +1,202 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + + +# Types and relationships for DMaaP data router feeds + +tosca_definitions_version: cloudify_dsl_1_3 + +plugins: + dmaapplugin: + executor: 'central_deployment_agent' + package_name: dmaap + package_version: 1.4.0 + + +node_types: + + # Data Router feed to be created + dcaegen2.nodes.Feed: + derived_from: cloudify.nodes.Root + + properties: + feed_name: + type: string + required: false + feed_version: + type: string + required: false + feed_description: + type: string + required: false + aspr_classification: + type: string + required: false + useExisting: + type: boolean + required: false + + interfaces: + cloudify.interfaces.lifecycle: + create: + implementation: + dmaapplugin.dmaapplugin.dr_lifecycle.create_feed + delete: + implementation: + dmaapplugin.dmaapplugin.dr_lifecycle.delete_feed + + # Existing Data Router feed to be used as target for publishing/subscribing + dcaegen2.nodes.ExistingFeed: + derived_from: cloudify.nodes.Root + + properties: + feed_id: + type: string + required: false + feed_name: + type: string + required: false + + interfaces: + cloudify.interfaces.lifecycle: + configure: + implementation: + dmaapplugin.dmaapplugin.dr_lifecycle.get_existing_feed + + # Existing Global Data Router feed (created via Invenio) to be used as target for bridging + dcaegen2.nodes.ExternalTargetFeed: + derived_from: cloudify.nodes.Root + + properties: + url: + type: string + required: true + username: + type: string + required: true + userpw: + type: string + required: true + + # Global Data Router feed to be used as a source for bridging + # Has no properties + dcaegen2.nodes.ExternalSourceFeed: + derived_from: cloudify.nodes.Root + + # Message Router topic to be created + dcaegen2.nodes.Topic: + derived_from: cloudify.nodes.Root + + properties: + topic_name: + type: string + required: false + topic_description: + type: string + required: false + txenable: + type: boolean + required: false + replication_case: + type: string + required: false + global_mr_url: + type: string + required: false + useExisting: + type: boolean + required: false + + interfaces: + cloudify.interfaces.lifecycle: + create: + implementation: + dmaapplugin.dmaapplugin.mr_lifecycle.create_topic + delete: + implementation: + dmaapplugin.dmaapplugin.mr_lifecycle.delete_topic + + # Existing Message Router topic to be used as target for publishing/subscribing + dcaegen2.nodes.ExistingTopic: + derived_from: cloudify.nodes.Root + + properties: + fqtn: + type: string + required: false + topic_name: + type: string + required: false + + interfaces: + cloudify.interfaces.lifecycle: + configure: + implementation: + dmaapplugin.dmaapplugin.mr_lifecycle.get_existing_topic + +relationships: + + dcaegen2.relationships.publish_files: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.dr_relationships.add_dr_publisher + unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_publisher + + dcaegen2.relationships.subscribe_to_files: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.dr_relationships.add_dr_subscriber + unlink: dmaapplugin.dmaapplugin.dr_relationships.delete_dr_subscriber + + dcaegen2.relationships.publish_events: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_publisher + unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client + + dcaegen2.relationships.subscribe_to_events: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.mr_relationships.add_mr_subscriber + unlink: dmaapplugin.dmaapplugin.mr_relationships.delete_mr_client + + dcaegen2.relationships.bridges_to: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_dr_bridge + unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge + + dcaegen2.relationships.bridges_to_external: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_dr_bridge + unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge + + dcaegen2.relationships.bridges_from_external_to_internal: + derived_from: cloudify.relationships.connected_to + target_interfaces: + cloudify.interfaces.relationship_lifecycle: + preconfigure: dmaapplugin.dmaapplugin.dr_bridge.create_external_source_dr_bridge + unlink: dmaapplugin.dmaapplugin.dr_bridge.remove_dr_bridge + diff --git a/dmaap/dmaapcontrollerif/__init__.py b/dmaap/dmaapcontrollerif/__init__.py new file mode 100644 index 0000000..611169f --- /dev/null +++ b/dmaap/dmaapcontrollerif/__init__.py @@ -0,0 +1 @@ +# DMaaP Bus Controller interface library \ No newline at end of file diff --git a/dmaap/dmaapcontrollerif/dmaap_requests.py b/dmaap/dmaapcontrollerif/dmaap_requests.py new file mode 100644 index 0000000..039643d --- /dev/null +++ b/dmaap/dmaapcontrollerif/dmaap_requests.py @@ -0,0 +1,310 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +import requests + +### "Constants" +FEEDS_PATH = '/feeds' +PUBS_PATH = '/dr_pubs' +SUBS_PATH = '/dr_subs' +TOPICS_PATH = '/topics' +CLIENTS_PATH = '/mr_clients' +LOCATIONS_PATH = '/dcaeLocations' + +class DMaaPControllerHandle(object): + ''' + A simple wrapper class to map DMaaP bus controller API calls into operations supported by the requests module + ''' + + def __init__(self, api_url, user, password, logger, + feeds_path = FEEDS_PATH, + pubs_path = PUBS_PATH, + subs_path = SUBS_PATH, + topics_path = TOPICS_PATH, + clients_path = CLIENTS_PATH): + ''' + Constructor + ''' + self.api_url = api_url # URL for the root of the Controller resource tree, no trailing "/" + self.auth = (user, password) # user name and password for HTTP basic auth + self.logger = logger + self.feeds_path = feeds_path + self.pubs_path = pubs_path + self.subs_path = subs_path + self.topics_path = topics_path + self.clients_path = clients_path + + + ### INTERNAL FUNCTIONS ### + + def _make_url(self, path): + ''' + Make a full URL given the path relative to the root + ''' + if not path.startswith('/'): + path = '/' + path + + return self.api_url + path + + def _get_resource(self, path): + ''' + Get the DMaaP resource at path, where path is relative to the root. + ''' + url = self._make_url(path) + self.logger.info("Querying URL: {0}".format(url)) + return requests.get(url, auth=self.auth) + + def _create_resource(self, path, resource_content): + ''' + Create a DMaaP resource by POSTing to the resource collection + identified by path (relative to root), using resource_content as the body of the post + ''' + url = self._make_url(path) + self.logger.info("Posting to URL: {0} with body: {1}".format(url, resource_content)) + return requests.post(url, auth=self.auth, json=resource_content) + + def _delete_resource(self, path): + ''' + Delete the DMaaP resource at path, where path is relative to the root. + ''' + url = self._make_url(path) + self.logger.info("Deleting URL: {0}".format(url)) + return requests.delete(url, auth=self.auth) + + ### PUBLIC API ### + + # Data Router Feeds + def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None, useExisting=None): + ''' + Create a DMaaP data router feed with the given feed name + and (optionally) feed version, feed description, ASPR classification, + owner, and useExisting flag + ''' + feed_definition = {'feedName' : name} + if version: + feed_definition['feedVersion'] = version + if description: + feed_definition['feedDescription'] = description + if aspr_class: + feed_definition['asprClassification'] = aspr_class + if owner: + feed_definition['owner'] = owner + feeds_path_query = self.feeds_path + if useExisting == True: # It's a boolean! + feeds_path_query += "?useExisting=true" + + return self._create_resource(feeds_path_query, feed_definition) + + def get_feed_info(self, feed_id): + ''' + Get the representation of the DMaaP data router feed whose feed id is feed_id. + ''' + return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id)) + + def get_feed_info_by_name(self, feed_name): + ''' + Get the representation of the DMaaP data router feed whose feed name is feed_name. + ''' + feeds = self._get_resource("{0}".format(self.feeds_path)) + feed_list = feeds.json() + for feed in feed_list: + if feed["feedName"] == feed_name: + self.logger.info("Found feed with {0}".format(feed_name)) + feed_id = feed["feedId"] + return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id)) + + self.logger.info("feed_name {0} not found".format(feed_name)) + return None + + def delete_feed(self, feed_id): + ''' + Delete the DMaaP data router feed whose feed id is feed_id. + ''' + return self._delete_resource("{0}/{1}".format(self.feeds_path, feed_id)) + + # Data Router Publishers + def add_publisher(self, feed_id, location, username, password, status=None): + ''' + Add a publisher to feed feed_id at location location with user, pass, and status + ''' + publisher_definition = { + 'feedId' : feed_id, + 'dcaeLocationName' : location, + 'username' : username, + 'userpwd' : password + } + + if status: + publisher_definition['status'] = status + + return self._create_resource(self.pubs_path, publisher_definition) + + def get_publisher_info(self, pub_id): + ''' + Get the representation of the DMaaP data router publisher whose publisher id is pub_id + ''' + return self._get_resource("{0}/{1}".format(self.pubs_path, pub_id)) + + def delete_publisher(self, pub_id): + ''' + Delete the DMaaP data router publisher whose publisher id is id. + ''' + return self._delete_resource("{0}/{1}".format(self.pubs_path, pub_id)) + + + # Data Router SUbscrihers + def add_subscriber(self, feed_id, location, delivery_url, username, password, decompress, privileged, status=None): + ''' + Add a publisher to feed feed_id at location location with user, pass, and status + ''' + subscriber_definition = { + 'feedId' : feed_id, + 'dcaeLocationName' : location, + 'deliveryURL' : delivery_url, + 'username' : username, + 'userpwd' : password, + 'decompress': decompress, + 'privilegedSubscriber': privileged + } + + if status: + subscriber_definition['status'] = status + + return self._create_resource(self.subs_path, subscriber_definition) + + def get_subscriber_info(self, sub_id): + ''' + Get the representation of the DMaaP data router subscriber whose subscriber id is sub_id + ''' + return self._get_resource("{0}/{1}".format(self.subs_path, sub_id)) + + def delete_subscriber(self, sub_id): + ''' + Delete the DMaaP data router subscriber whose subscriber id is sub_id. + ''' + return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id)) + + # Message router topics + def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None, useExisting = None): + ''' + Create a message router topic with the topic name 'name' and optionally the topic_description + 'description', the 'txenable' flag, the 'useExisting' flag and the topic owner 'owner'. + ''' + topic_definition = {'topicName' : name} + if description: + topic_definition['topicDescription'] = description + if owner: + topic_definition['owner'] = owner + if txenable != None: # It's a boolean! + topic_definition['txenable'] = txenable + if replication_case: + topic_definition['replicationCase'] = replication_case + if global_mr_url: + topic_definition['globalMrURL'] = global_mr_url + topics_path_query = self.topics_path + if useExisting == True: # It's a boolean! + topics_path_query += "?useExisting=true" + + return self._create_resource(topics_path_query, topic_definition) + + def get_topic_info(self, fqtn): + ''' + Get information about the topic whose fully-qualified name is 'fqtn' + ''' + return self._get_resource("{0}/{1}".format(self.topics_path, fqtn)) + + def get_topic_fqtn_by_name(self, topic_name): + ''' + Get the representation of the DMaaP message router topic fqtn whose topic name is topic_name. + ''' + topics = self._get_resource("{0}".format(self.topics_path)) + topic_list = topics.json() + for topic in topic_list: + if topic["topicName"] == topic_name: + self.logger.info("Found existing topic with name {0}".format(topic_name)) + fqtn = topic["fqtn"] + return fqtn + + self.logger.info("topic_name {0} not found".format(topic_name)) + return None + + def delete_topic(self, fqtn): + ''' + Delete the topic whose fully qualified name is 'fqtn' + ''' + return self._delete_resource("{0}/{1}".format(self.topics_path, fqtn)) + + # Message route clients (publishers and subscribers + def create_client(self, fqtn, location, client_role, actions): + ''' + Creates a client authorized to access the topic with fully-qualified name 'fqtn', + from the location 'location', using the AAF client role 'client_role'. The + client is authorized to perform actions in the list 'actions'. (Valid + values are 'pub', 'sub', and 'view' + ''' + client_definition = { + 'fqtn' : fqtn, + 'dcaeLocationName' : location, + 'clientRole' : client_role, + 'action' : actions + } + return self._create_resource(self.clients_path, client_definition) + + def get_client_info(self, client_id): + ''' + Get client information for the client whose client ID is 'client_id' + ''' + return self._get_resource("{0}/{1}".format(self.clients_path, client_id)) + + def delete_client(self, client_id): + ''' + Delete the client whose client ID is 'client_id' + ''' + return self._delete_resource("{0}/{1}".format(self.clients_path, client_id)) + + def get_dcae_locations(self, dcae_layer): + ''' + Get the list of location names known to the DMaaP bus controller + whose "dcaeLayer" property matches dcae_layer and whose status is "VALID". + ''' + # Do these as a separate step so things like 404 get reported precisely + locations = self._get_resource(LOCATIONS_PATH) + locations.raise_for_status() + + # pull out location names for VALID locations with matching dcae_layer + return [location["dcaeLocationName"] for location in locations.json() + if location['dcaeLayer'] == dcae_layer + and location['status'] == 'VALID'] + + def get_dcae_central_locations(self): + ''' + Get the list of location names known to the DMaaP bus controller + whose "dcaeLayer" property contains "central" (ignoring case) + and whose status is "VALID". + "dcaeLayer" contains "central" for central sites. + ''' + # Do these as a separate step so things like 404 get reported precisely + locations = self._get_resource(LOCATIONS_PATH) + locations.raise_for_status() + + # pull out location names for VALID central locations + return [location["dcaeLocationName"] for location in locations.json() + if 'central' in location['dcaeLayer'].lower() + and location['status'] == 'VALID'] + diff --git a/dmaap/dmaapplugin/__init__.py b/dmaap/dmaapplugin/__init__.py new file mode 100644 index 0000000..7a760d7 --- /dev/null +++ b/dmaap/dmaapplugin/__init__.py @@ -0,0 +1,82 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +## Get parameters for accessing the DMaaP controller +from consulif.consulif import ConsulHandle +from cloudify.exceptions import NonRecoverableError +import os + +os.environ["REQUESTS_CA_BUNDLE"]="/opt/onap/certs/cacert.pem" # This is to handle https request thru plugin + +CONSUL_HOST = "consul" # Should always be a local consul agent on Cloudify Manager +DBCL_KEY_NAME = "dmaap-plugin" # Consul key containing DMaaP data bus credentials +# In the ONAP Kubernetes environment, bus controller address is always "dmaap-bc", on port 8080 (http) and 8443 (https) +ONAP_SERVICE_ADDRESS = "dmaap-bc" +HTTP_PORT = "8080" +HTTPS_PORT = "8443" + +try: + _ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, None) +except Exception as e: + raise NonRecoverableError("Error getting ConsulHandle when configuring dmaap plugin: {0}".format(e)) + +try: + config = _ch.get_config(DBCL_KEY_NAME) +except Exception as e: + raise NonRecoverableError("Error getting config for '{0}' from ConsulHandle when configuring dmaap plugin: {1}".format(DBCL_KEY_NAME, e)) + +try: + DMAAP_USER = config['dmaap']['username'] +except Exception as e: + raise NonRecoverableError("Error setting DMAAP_USER while configuring dmaap plugin: {0}".format(e)) + +try: + DMAAP_PASS = config['dmaap']['password'] +except Exception as e: + raise NonRecoverableError("Error setting DMAAP_PASS while configuring dmaap plugin: {0}".format(e)) + +try: + DMAAP_OWNER = config['dmaap']['owner'] +except Exception as e: + raise NonRecoverableError("Error setting DMAAP_OWNER while configuring dmaap plugin: {0}".format(e)) + +try: + if 'protocol' in config['dmaap']: + DMAAP_PROTOCOL = config['dmaap']['protocol'] + service_port = HTTP_PORT + else: + DMAAP_PROTOCOL = 'https' # Default to https (service discovery should give us this but doesn't + service_port = HTTPS_PORT +except Exception as e: + raise NonRecoverableError("Error setting DMAAP_PROTOCOL while configuring dmaap plugin: {0}".format(e)) + +try: + if 'path' in config['dmaap']: + DMAAP_PATH = config['dmaap']['path'] + else: + DMAAP_PATH = 'webapi' # SHould come from service discovery but Consul doesn't support it +except Exception as e: + raise NonRecoverableError("Error setting DMAAP_PATH while configuring dmaap plugin: {0}".format(e)) + +try: + service_address = ONAP_SERVICE_ADDRESS + DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH) +except Exception as e: + raise NonRecoverableError("Error setting DMAAP_API_URL while configuring dmaap plugin: {0}".format(e)) + diff --git a/dmaap/dmaapplugin/dmaaputils.py b/dmaap/dmaapplugin/dmaaputils.py new file mode 100644 index 0000000..262abcb --- /dev/null +++ b/dmaap/dmaapplugin/dmaaputils.py @@ -0,0 +1,29 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +# Utility functions + +import string +from random import SystemRandom + +def random_string(n): + ''' + Create a random alphanumeric string, n characters long. + ''' + secureRandomGen = SystemRandom() + return ''.join(secureRandomGen.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for x in range(n)) diff --git a/dmaap/dmaapplugin/dr_bridge.py b/dmaap/dmaapplugin/dr_bridge.py new file mode 100644 index 0000000..a188667 --- /dev/null +++ b/dmaap/dmaapplugin/dr_bridge.py @@ -0,0 +1,199 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS +from dmaapplugin.dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +# Set up a subscriber to a source feed +def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw): + # Add subscriber to source feed + add_sub = dmc.add_subscriber(source_feed_id, loc, delivery_url, username, userpw) + add_sub.raise_for_status() + return add_sub.json() + +# Set up a publisher to a target feed +def _set_up_publisher(dmc, target_feed_id, loc): + username = random_string(8) + userpw = random_string(16) + add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw) + add_pub.raise_for_status() + pub_info = add_pub.json() + return pub_info["pubId"], username, userpw + +# Get a central location to use when creating a publisher or subscriber +def _get_central_location(dmc): + locations = dmc.get_dcae_central_locations() + if len(locations) < 1: + raise Exception('No central location found for setting up DR bridging') + return locations[0] # We take the first one. Typically there will be two central locations + + +# Set up a "bridge" between two feeds internal to DCAE +# A source feed "bridges_to" a target feed, meaning that anything published to +# the source feed will be delivered to subscribers to the target feed (as well as +# to subscribers of the source feed). +# +# The bridge is established by first adding a publisher to the target feed. The result of doing this +# is a publish URL and a set of publication credentials. +#The publish URL and publication credentials are used to set up a subscriber to the source feed. +#I.e., we tell the source feed to deliver to an endpoint which is actually a publish +# endpoint for the target feed. +@operation +def create_dr_bridge(**kwargs): + + try: + + # Get source and target feed ids + if 'feed_id' in ctx.target.instance.runtime_properties: + target_feed_id = ctx.target.instance.runtime_properties['feed_id'] + else: + raise Exception('Target feed has no feed_id property') + if 'feed_id' in ctx.source.instance.runtime_properties: + source_feed_id = ctx.source.instance.runtime_properties['feed_id'] + else: + raise Exception('Source feed has no feed_id property') + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + # Get a location to use when creating a publisher or subscriber--a central location seems reasonable + loc = _get_central_location(dmc) + + ctx.logger.info('Creating bridge from feed {0} to feed {1} using location {2}'.format(source_feed_id, target_feed_id, loc)) + + # Add publisher to target feed + publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc) + ctx.logger.info("Added publisher id {0} to target feed {1} with user {2}".format(publisher_id, target_feed_id, username)) + + # Add subscriber to source feed + delivery_url = ctx.target.instance.runtime_properties['publish_url'] + subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw) + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, delivery_url)) + + # Save the publisher and subscriber IDs on the source node, indexed by the target node id + ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "subscriber_id": subscriber_id} + + except Exception as e: + ctx.logger.error("Error creating bridge: {0}".format(e)) + raise NonRecoverableError(e) + +# Set up a bridge from an internal DCAE feed to a feed in an external Data Router system +# The target feed needs to be provisioned in the external Data Router system. A publisher +# to that feed must also be set up in the external Data Router system. The publish URL, +# username, and password need to be captured in a target node of type dcae.nodes.ExternalTargetFeed. +# The bridge is established by setting up a subscriber to the internal DCAE source feed using the +# external feed publisher parameters as delivery parameters for the subscriber. +@operation +def create_external_dr_bridge(**kwargs): + try: + + # Make sure target feed has full set of properties + if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties: + url = ctx.target.node.properties['url'] + username = ctx.target.node.properties['username'] + userpw = ctx.target.node.properties['userpw'] + else: + raise Exception ("Target feed missing url, username, and/or user pw") + + # Make sure source feed has a feed ID + if 'feed_id' in ctx.source.instance.runtime_properties: + source_feed_id = ctx.source.instance.runtime_properties['feed_id'] + else: + raise Exception('Source feed has no feed_id property') + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + # Get a central location to use when creating subscriber + loc = _get_central_location(dmc) + + ctx.logger.info('Creating external bridge from feed {0} to external url {1} using location {2}'.format(source_feed_id, url, loc)) + + # Create subscription to source feed using properties of the external target feed + subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, url, username, userpw) + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, url)) + + # Save the subscriber ID on the source node, indexed by the target node id + ctx.source.instance.runtime_properties[ctx.target.node.id] = {"subscriber_id": subscriber_id} + + except Exception as e: + ctx.logger.error("Error creating external bridge: {0}".format(e)) + raise NonRecoverableError(e) + +# Set up a bridge from a feed in an external Data Router system to an internal DCAE feed. +# The bridge is established by creating a publisher on the internal DCAE feed. Then a subscription +# to the external feed is created through manual provisioning in the external Data Router system, using +# the publish URL and the publisher username and password for the internal feed as the delivery parameters +# for the external subscription. +# In order to obtain the publish URL, publisher username, and password, a blueprint using this sort of +# bridge will typically have an output that exposes the runtime_property set on the source node in this operation. +@operation +def create_external_source_dr_bridge(**kwargs): + try: + # Get target feed id + if 'feed_id' in ctx.target.instance.runtime_properties: + target_feed_id = ctx.target.instance.runtime_properties['feed_id'] + else: + raise Exception('Target feed has no feed_id property') + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + # Get a central location to use when creating a publisher + loc = _get_central_location(dmc) + + # Create a publisher on the target feed + publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc) + + # Save the publisher info on the source node, indexed by the target node + ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "url": ctx.target.instance.runtime_properties["publish_url"], "username": username, "userpw": userpw} + + except Exception as e: + ctx.logger.error("Error creating external source bridge: {0}".format(e)) + +# Remove the bridge between the relationship source and target. +# For a bridge between 2 internal feeds, deletes the subscriber on the source feed and the publisher on the target feed. +# For a bridge to an external target feed, deletes the subscriber on the source feed. +# For a bridge from an external source feed, deletes the publisher on the target feed. +@operation +def remove_dr_bridge(**kwargs): + try: + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + + if ctx.target.node.id in ctx.source.instance.runtime_properties: + + if 'subscriber_id' in ctx.source.instance.runtime_properties[ctx.target.node.id]: + # Delete the subscription for this bridge + ctx.logger.info("Removing bridge -- deleting subscriber {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id'])) + dmc.delete_subscriber(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id']) + + if 'publisher_id' in ctx.source.instance.runtime_properties: + # Delete the publisher for this bridge + ctx.logger.info("Removing bridge -- deleting publisher {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id'])) + dmc.delete_publisher(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id']) + + ctx.logger.info("Remove bridge from {0} to {1}".format(ctx.source.node.id, ctx.target.node.id)) + + except Exception as e: + ctx.logger.error("Error removing bridge: {0}".format(e)) + # Let the uninstall workflow proceed--don't throw a NonRecoverableError diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py new file mode 100644 index 0000000..af37977 --- /dev/null +++ b/dmaap/dmaapplugin/dr_lifecycle.py @@ -0,0 +1,153 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER +from dmaapplugin.dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +# Lifecycle operations for DMaaP Data Router feeds + +@operation +def create_feed(**kwargs): + ''' + Create a new data router feed + Expects "feed_name" to be set in node properties + If 'feed_name' is not set or is empty, generates a random one. + Allows "feed_version", "feed_description", "aspr_classification" and "useExisting" as optional properties + (Sets default values if not provided ) + Sets instance runtime properties: + Note that 'useExisting' is a flag indicating whether DBCL will use existing feed if the feed already exists. + - "feed_id" + - "publish_url" + - "log_url" + + ''' + try: + # Make sure there's a feed_name + feed_name = ctx.node.properties.get("feed_name") + if not (feed_name and feed_name.strip()): + feed_name = random_string(12) + + # Set defaults/placeholders for the optional properties for the feed + if "feed_version" in ctx.node.properties: + feed_version = ctx.node.properties["feed_version"] + else: + feed_version = "0.0" + if "feed_description" in ctx.node.properties: + feed_description = ctx.node.properties["feed_description"] + else: + feed_description = "No description provided" + if "aspr_classification" in ctx.node.properties: + aspr_classification = ctx.node.properties["aspr_classification"] + else: + aspr_classification = "unclassified" + if "useExisting" in ctx.node.properties: + useExisting = ctx.node.properties["useExisting"] + else: + useExisting = False + + # Make the request to the controller + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to create feed name {0}".format(feed_name)) + f = dmc.create_feed(feed_name, feed_version, feed_description, aspr_classification, DMAAP_OWNER, useExisting) + f.raise_for_status() + + # Capture important properties from the result + feed = f.json() + ctx.instance.runtime_properties["feed_id"] = feed["feedId"] + ctx.instance.runtime_properties["publish_url"] = feed["publishURL"] + ctx.instance.runtime_properties["log_url"] = feed["logURL"] + ctx.logger.info("Created feed name {0} with feed id {1}".format(feed_name, feed["feedId"])) + + except Exception as e: + ctx.logger.error("Error creating feed: {er}".format(er=e)) + raise NonRecoverableError(e) + + +@operation +def get_existing_feed(**kwargs): + ''' + Find information for an existing data router feed + Expects "feed_id" to be set in node properties -- uniquely identifies the feed + Sets instance runtime properties: + - "feed_id" + - "publish_url" + - "log_url" + ''' + + try: + # Make the lookup request to the controller + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("DMaaPControllerHandle() returned") + feed_id_input = False + if "feed_id" in ctx.node.properties: + feed_id_input = True + f = dmc.get_feed_info(ctx.node.properties["feed_id"]) + elif "feed_name" in ctx.node.properties: + feed_name = ctx.node.properties["feed_name"] + f = dmc.get_feed_info_by_name(feed_name) + if f is None: + ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name)) + raise ValueError("Not find existing feed with feed name " + feed_name) + else: + raise ValueError("Either feed_id or feed_name must be defined to get existing feed") + + f.raise_for_status() + + # Capture important properties from the result + feed = f.json() + feed_id = feed["feedId"] + ctx.instance.runtime_properties["feed_id"] = feed_id # Just to be consistent with newly-created node, above + ctx.instance.runtime_properties["publish_url"] = feed["publishURL"] + ctx.instance.runtime_properties["log_url"] = feed["logURL"] + if feed_id_input: + ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"])) + else: + ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"])) + + except ValueError as e: + ctx.logger.error("{er}".format(er=e)) + raise NonRecoverableError(e) + except Exception as e: + if feed_id_input: + ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e)) + else: + ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e)) + raise NonRecoverableError(e) + + +@operation +def delete_feed(**kwargs): + ''' + Delete a feed + Expects "feed_id" to be set on the instance's runtime properties + ''' + try: + # Make the lookup request to the controllerid=ctx.node.properties["feed_id"] + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + f = dmc.delete_feed(ctx.instance.runtime_properties["feed_id"]) + f.raise_for_status() + ctx.logger.info("Deleting feed id {0}".format(ctx.instance.runtime_properties["feed_id"])) + + except Exception as e: + ctx.logger.error("Error deleting feed id {id}: {er}".format(id=ctx.instance.runtime_properties["feed_id"],er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py new file mode 100644 index 0000000..f1ff986 --- /dev/null +++ b/dmaap/dmaapplugin/dr_relationships.py @@ -0,0 +1,219 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST +from dmaapplugin.dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle +from consulif.consulif import ConsulHandle + +# Lifecycle operations for DMaaP Data Router +# publish and subscribe relationships + +@operation +def add_dr_publisher(**kwargs): + ''' + Sets up the source of the publishes_relationship as a publisher to the feed that + is the target of the relationship + Assumes target (the feed) has the following runtime properties set + - feed_id + - log_url + - publish_url + Assumes source (the publisher) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing one property: + - location (the dcaeLocationName to pass when adding the publisher to the feed) + Generates a user name and password that the publisher will need to use when publishing + Adds the following properties to the dictionary above: + - publish_url + - log_url + - username + - password + ''' + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + target_feed = ctx.target.node.id + ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed)) + + # Set up the parameters for the add_publisher request to the DMaaP bus controller + feed_id = ctx.target.instance.runtime_properties["feed_id"] + location = ctx.source.instance.runtime_properties[target_feed]["location"] + username = random_string(8) + password = random_string(16) + + # Make the request to add the publisher to the feed + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + add_pub = dmc.add_publisher(feed_id, location, username, password) + add_pub.raise_for_status() + publisher_info = add_pub.json() + publisher_id = publisher_info["pubId"] + ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password)) + + # Set runtime properties on the source + ctx.source.instance.runtime_properties[target_feed] = { + "publisher_id" : publisher_id, + "location" : location, + "publish_url" : ctx.target.instance.runtime_properties["publish_url"], + "log_url" : ctx.target.instance.runtime_properties["log_url"], + "username" : username, + "password" : password + } + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + cpy = dict(ctx.source.instance.runtime_properties[target_feed]) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy) + + except Exception as e: + ctx.logger.error("Error adding publisher to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + + +@operation +def delete_dr_publisher(**kwargs): + ''' + Deletes publisher (the source of the publishes_files relationship) + from the feed (the target of the relationship). + Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties, + when the publisher was added to the feed. + ''' + + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + # Get the publisher id + target_feed = ctx.target.node.id + publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"] + ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id)) + + # Make the request + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + del_result = dmc.delete_publisher(publisher_id) + del_result.raise_for_status() + + ctx.logger.info("Deleted publisher {0}".format(publisher_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting publisher: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue + + +@operation +def add_dr_subscriber(**kwargs): + ''' + Sets up the source of the subscribes_to_files relationship as a subscriber to the + feed that is the target of the relationship. + Assumes target (the feed) has the following runtime property set + - feed_id + Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing the following properties: + - location (the dcaeLocationName to pass when adding the publisher to the feed) + - delivery_url (the URL to which data router will deliver files) + - username (the username data router will use when delivering files) + - password (the password data router will use when delivering files) + Adds a property to the dictionary above: + - subscriber_id (used to delete the subscriber in the uninstall workflow + ''' + try: + target_feed = ctx.target.node.id + ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed)) + + # Get the parameters for the call + feed_id = ctx.target.instance.runtime_properties["feed_id"] + feed = ctx.source.instance.runtime_properties[target_feed] + location = feed["location"] + delivery_url = feed["delivery_url"] + username = feed["username"] + password = feed["password"] + decompress = feed["decompress"] if "decompress" in feed else False + privileged = feed["privileged"] if "privileged" in feed else False + + # Make the request to add the subscriber to the feed + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password, decompress, privileged) + add_sub.raise_for_status() + subscriber_info = add_sub.json() + subscriber_id = subscriber_info["subId"] + ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location)) + + # Add subscriber_id to the runtime properties + # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id + ctx.source.instance.runtime_properties[target_feed] = { + "subscriber_id": subscriber_id, + "location" : location, + "delivery_url" : delivery_url, + "username" : username, + "password" : password, + "decompress": decompress, + "privilegedSubscriber": privileged + } + ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed])) + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + cpy = dict(ctx.source.instance.runtime_properties[target_feed]) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy) + + except Exception as e: + ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + + +@operation +def delete_dr_subscriber(**kwargs): + ''' + Deletes subscriber (the source of the subscribes_to_files relationship) + from the feed (the target of the relationship). + Assumes that the source node's runtime properties dictionary for the target feed + includes 'subscriber_id', set when the publisher was added to the feed. + ''' + try: + # Get the subscriber id + target_feed = ctx.target.node.id + subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] + ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed)) + + # Make the request + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + del_result = dmc.delete_subscriber(subscriber_id) + del_result.raise_for_status() + + ctx.logger.info("Deleted subscriber {0}".format(subscriber_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting subscriber: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py new file mode 100644 index 0000000..6fe3023 --- /dev/null +++ b/dmaap/dmaapplugin/mr_lifecycle.py @@ -0,0 +1,143 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER +from dmaapplugin.dmaaputils import random_string +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +# Lifecycle operations for DMaaP Message Router topics +@operation +def create_topic(**kwargs): + ''' + Creates a message router topic. + Allows 'topic_name', 'topic_description', 'txenable', 'replication_case', 'global_mr_url', + and 'useExisting' as optional node properties. If 'topic_name' is not set, + generates a random one. + Sets 'fqtn' in the instance runtime_properties. + Note that 'txenable' is a Message Router flag indicating whether transactions + are enabled on the topic. + Note that 'useExisting' is a flag indicating whether DBCL will use existing topic if + the topic already exists. + ''' + try: + # Make sure there's a topic_name + if "topic_name" in ctx.node.properties: + topic_name = ctx.node.properties["topic_name"] + if topic_name == '' or topic_name.isspace(): + topic_name = random_string(12) + else: + topic_name = random_string(12) + + # Make sure there's a topic description + if "topic_description" in ctx.node.properties: + topic_description = ctx.node.properties["topic_description"] + else: + topic_description = "No description provided" + + # ..and the truly optional setting + if "txenable" in ctx.node.properties: + txenable = ctx.node.properties["txenable"] + else: + txenable= False + + if "replication_case" in ctx.node.properties: + replication_case = ctx.node.properties["replication_case"] + else: + replication_case = None + + if "global_mr_url" in ctx.node.properties: + global_mr_url = ctx.node.properties["global_mr_url"] + else: + global_mr_url = None + + if "useExisting" in ctx.node.properties: + useExisting = ctx.node.properties["useExisting"] + else: + useExisting = False + + # Make the request to the controller + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to create topic name {0}".format(topic_name)) + t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url, useExisting) + t.raise_for_status() + + # Capture important properties from the result + topic = t.json() + ctx.instance.runtime_properties["fqtn"] = topic["fqtn"] + + except Exception as e: + ctx.logger.error("Error creating topic: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def get_existing_topic(**kwargs): + ''' + Get data for an existing topic. + Expects 'fqtn' as a node property. + Copies this property to 'fqtn' in runtime properties for consistency + with a newly-created topic. + While there's no real need to make a call to the DMaaP bus controller, + we do so just to make sure the fqtn is known to the controller, so we + don't run into problems when we try to add a publisher or subscriber later. + ''' + try: + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + fqtn_input = False + if "fqtn" in ctx.node.properties: + fqtn = ctx.node.properties["fqtn"] + fqtn_input = True + elif "topic_name" in ctx.node.properties: + topic_name = ctx.node.properties["topic_name"] + ctx.logger.info("Attempting to get fqtn for existing topic {0}".format(topic_name)) + fqtn = dmc.get_topic_fqtn_by_name(topic_name) + if fqtn is None: + raise ValueError("Not find existing topic with name " + topic_name) + else: + ctx.logger.error("Not find existing topic with name {0}".format(topic_name)) + raise ValueError("Either fqtn or topic_name must be defined to get existing topic") + + ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn)) + t = dmc.get_topic_info(fqtn) + t.raise_for_status() + + ctx.instance.runtime_properties["fqtn"] = fqtn + + except Exception as e: + ctx.logger.error("Error getting existing topic: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def delete_topic(**kwargs): + ''' + Delete the topic. Expects the instance runtime property "fqtn" to have been + set when the topic was created. + ''' + try: + fqtn = ctx.instance.runtime_properties["fqtn"] + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to delete topic {0}".format(fqtn)) + t = dmc.delete_topic(fqtn) + t.raise_for_status() + + except Exception as e: + ctx.logger.error("Error getting existing topic: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue diff --git a/dmaap/dmaapplugin/mr_relationships.py b/dmaap/dmaapplugin/mr_relationships.py new file mode 100644 index 0000000..34d02e2 --- /dev/null +++ b/dmaap/dmaapplugin/mr_relationships.py @@ -0,0 +1,119 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle +from consulif.consulif import ConsulHandle + +# Message router relationship operations + +def _add_mr_client(ctype, actions): + ''' + Adds the node represented by 'source' as a client (publisher or subscriber) to + to topic represented by the 'target' node. The list of actions in 'actions' + determines whether the client is a subscriber or a publisher. + + Assumes target (the topic) has the following runtime property set + - fqtn + Assumes source (the client) has a runtime property whose name matches the node name of the feed. + This is a dictionary containing the following properties: + - location (the dcaeLocationName to pass when adding the client to the topic) + - client_role (the AAF client role under which the client will access the topic) + Adds two properties to the dictionary above: + - topic_url (the URL that the client can use to access the topic) + - client_id (used to delete the client in the uninstall workflow) + ''' + try: + # Make sure we have a name under which to store DMaaP configuration + # Check early so we don't needlessly create DMaaP entities + if 'service_component_name' not in ctx.source.instance.runtime_properties: + raise Exception("Source node does not have 'service_component_name' in runtime_properties") + + target_topic = ctx.target.node.id # Key for the source's dictionary with topic-related info + fqtn = ctx.target.instance.runtime_properties["fqtn"] + ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn)) + + # Get the parameters needed for adding the client + location = ctx.source.instance.runtime_properties[target_topic]["location"] + client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"] + + # Make the request to add the client to the topic + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + c = dmc.create_client(fqtn, location, client_role, actions) + c.raise_for_status() + client_info = c.json() + client_id = client_info["mrClientId"] + topic_url = client_info["topicURL"] + + # Update source's runtime properties + #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url + #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id + ctx.source.instance.runtime_properties[target_topic] = { + "topic_url" : topic_url, + "client_id" : client_id, + "location" : location, + "client_role" : client_role + } + + ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location)) + + # Set key in Consul + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic]) + + except Exception as e: + ctx.logger.error("Error adding client to feed: {er}".format(er=e)) + raise NonRecoverableError(e) + +@operation +def add_mr_publisher(**kwargs): + _add_mr_client("publisher", ["view", "pub"]) + +@operation +def add_mr_subscriber(**kwargs): + _add_mr_client("subscriber", ["view", "sub"]) + +@operation +def delete_mr_client(**kwargs): + ''' + Delete the client (publisher or subscriber). + Expect property 'client_id' to have been set in the instance's runtime_properties + when the client was created. + ''' + try: + target_topic = ctx.target.node.id + client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"] + ctx.logger.info("Attempting to delete client {0} ".format(client_id)) + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + c = dmc.delete_client(client_id) + c.raise_for_status() + + ctx.logger.info("Deleted client {0}".format(client_id)) + + # Attempt to remove the entire ":dmaap" entry from the Consul KV store + # Will quietly do nothing if the entry has already been removed + ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger) + ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name'])) + + except Exception as e: + ctx.logger.error("Error deleting MR client: {er}".format(er=e)) + # don't raise a NonRecoverable error here--let the uninstall workflow continue + diff --git a/dmaap/pom.xml b/dmaap/pom.xml new file mode 100644 index 0000000..afc6089 --- /dev/null +++ b/dmaap/pom.xml @@ -0,0 +1,327 @@ + + + + 4.0.0 + + org.onap.dcaegen2.platform + plugins + 1.2.0-SNAPSHOT + + + + org.onap.dcaegen2.platform.plugins + dmaap + dmaap + + 1.5.0-SNAPSHOT + http://maven.apache.org + + + dmaap + + . + + dmaap.yaml + + type_files/dmaap/dmaap.yaml + UTF-8 + . + xunit-results.xml + coverage.xml + py + Python + **/*.py + tests/*,setup.py + + + + ${project.artifactId}-${project.version} + + + + org.codehaus.mojo + sonar-maven-plugin + 2.7.1 + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + + true + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.8 + + true + + + + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + 2.6 + + true + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + true + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + default-jar + + + + + + + + org.apache.maven.plugins + maven-install-plugin + 2.4 + + true + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + true + + + + + + org.codehaus.mojo + exec-maven-plugin + + + clean phase script + clean + exec + + ${session.executionRootDirectory}/mvn-phase-script.sh + + ${project.artifactId} + clean + + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${onap.nexus.url} + ${onap.nexus.rawrepo.baseurl.upload} + ${onap.nexus.rawrepo.baseurl.download} + ${onap.nexus.rawrepo.serverid} + ${plugin.name} + ${plugin.subdir} + + + + + + generate-sources script + generate-sources + exec + + mvn-phase-script.sh + + ${project.artifactId} + generate-sources + + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${onap.nexus.url} + ${onap.nexus.rawrepo.baseurl.upload} + ${onap.nexus.rawrepo.baseurl.download} + ${onap.nexus.rawrepo.serverid} + + + + + + compile script + compile + exec + + mvn-phase-script.sh + + ${project.artifactId} + compile + + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${onap.nexus.url} + ${onap.nexus.rawrepo.baseurl.upload} + ${onap.nexus.rawrepo.baseurl.download} + ${onap.nexus.rawrepo.serverid} + + + + + + package script + package + exec + + mvn-phase-script.sh + + ${project.artifactId} + package + + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${onap.nexus.url} + ${onap.nexus.rawrepo.baseurl.upload} + ${onap.nexus.rawrepo.baseurl.download} + ${onap.nexus.rawrepo.serverid} + ${plugin.name} + ${plugin.subdir} + + + + + + test script + test + exec + + mvn-phase-script.sh + + ${project.artifactId} + test + + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${onap.nexus.url} + ${onap.nexus.rawrepo.baseurl.upload} + ${onap.nexus.rawrepo.baseurl.download} + ${onap.nexus.rawrepo.serverid} + ${plugin.name} + ${plugin.subdir} + + + + + + install script + install + exec + + mvn-phase-script.sh + + ${project.artifactId} + install + + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${onap.nexus.url} + ${onap.nexus.rawrepo.baseurl.upload} + ${onap.nexus.rawrepo.baseurl.download} + ${onap.nexus.rawrepo.serverid} + + + + + + deploy script + deploy + exec + + ${session.executionRootDirectory}/mvn-phase-script.sh + + ${project.artifactId} + deploy + + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${onap.nexus.url} + ${onap.nexus.rawrepo.baseurl.upload} + ${onap.nexus.rawrepo.baseurl.download} + ${onap.nexus.rawrepo.serverid} + ${project.distributionManagement.snapshotRepository.id} + ${typefile.source} + ${typefile.dest} + ${plugin.name} + ${plugin.subdir} + + + + + + + + diff --git a/dmaap/requirements.txt b/dmaap/requirements.txt new file mode 100644 index 0000000..54ffbc4 --- /dev/null +++ b/dmaap/requirements.txt @@ -0,0 +1,3 @@ +python-consul>=0.7.0 +requests +cloudify-common>=5.0.5 diff --git a/dmaap/setup.py b/dmaap/setup.py new file mode 100644 index 0000000..1bdb14f --- /dev/null +++ b/dmaap/setup.py @@ -0,0 +1,36 @@ +# ============LICENSE_START==================================================== +# ============================================================================= +# Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +from setuptools import setup, find_packages + +setup( + name = "dmaap", + version = "1.5.0", + packages=find_packages(), + author = "AT&T", + description = ("Cloudify plugin for creating DMaaP feeds and topics, and setting up publishers and subscribers."), + license = "", + keywords = "", + url = "", + zip_safe=False, + install_requires=[ + 'python-consul>=0.7.0', + 'requests', + 'cloudify-common>=5.0.5', + ], +) diff --git a/dmaap/tests/conftest.py b/dmaap/tests/conftest.py new file mode 100644 index 0000000..9ae7b40 --- /dev/null +++ b/dmaap/tests/conftest.py @@ -0,0 +1,88 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +import pytest + +import requests + +@pytest.fixture() +def mockconsul(monkeypatch): + """ Override the regular Consul interface""" + def fake_get_config(self, key): + config={'dmaap': { + 'username': 'testuser@dmaaptest.example.com', + 'url': 'https://dmaaptest.example.com:8443/webapi', + 'password' : 'testpassword', + 'owner': 'dcaeorch' + }} + return config + + def fake_get_service(self, service_name): + service_address = "myAddress" + service_port= "8443" + return service_address, service_port + + def fake_add_to_entry(self, key, add_name, add_value): + return True + + def fake_delete_entry(self, entry_name): + return True + + def fake_init(self, api_url, user, password, logger): + pass + + from consulif.consulif import ConsulHandle + monkeypatch.setattr(ConsulHandle, 'get_config', fake_get_config) + monkeypatch.setattr(ConsulHandle, 'get_service', fake_get_service) + monkeypatch.setattr(ConsulHandle, 'add_to_entry', fake_add_to_entry) + monkeypatch.setattr(ConsulHandle, 'delete_entry', fake_delete_entry) + monkeypatch.setattr(ConsulHandle, '__init__', fake_init) + + def get_handle(): + return ConsulHandle('mockconsul', None, None, None) + return get_handle + + +@pytest.fixture() +def mockdmaapbc(monkeypatch): + + def fake_get(url, auth): + # print "fake_get: {0}, {1}".format(url, auth) + r = requests.Response() + r.status_code = 200 + return r + def fake_post(url, auth, json): + # print "fake_post: {0}, {1}, {2}".format(url, auth, json) + r = requests.Response() + r.status_code = 200 + return r + def fake_delete(url, auth): + # print "fake_delete: {0}, {1}".format(url, auth) + r = requests.Response() + r.status_code = 200 + return r + def fake_json(self): + return {"fqtn":"test_fqtn"} + + import requests + monkeypatch.setattr(requests.Response, "json", fake_json) + monkeypatch.setattr(requests, "get", fake_get) + monkeypatch.setattr(requests, "post", fake_post) + monkeypatch.setattr(requests, "delete", fake_delete) + diff --git a/dmaap/tests/test_consulif.py b/dmaap/tests/test_consulif.py new file mode 100644 index 0000000..a45c6a4 --- /dev/null +++ b/dmaap/tests/test_consulif.py @@ -0,0 +1,72 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + + +import pytest +from cloudify.exceptions import NonRecoverableError +import os +from consulif.consulif import ConsulHandle + + +# No connections are actually made to this host +CONSUL_HOST = "consul" # Should always be a local consul agent on Cloudify Manager +#CONSUL_PORT = '8510' +CONSUL_PORT = '8500' +DBCL_KEY_NAME = "dmaap_dbcl_info" # Consul key containing DMaaP data bus credentials +DBC_SERVICE_NAME= "dmaap_bus_controller" # Name under which the DMaaP bus controller is registered + + +def test_get_config_service(mockconsul): + err_msg = "Error getting ConsulHandle when configuring dmaap plugin: {0}" + _ch = ConsulHandle("http://{0}:{1}".format(CONSUL_HOST, CONSUL_PORT), None, None, None) + + config = _ch.get_config(DBCL_KEY_NAME) + + DMAAP_USER = config['dmaap']['username'] + DMAAP_PASS = config['dmaap']['password'] + DMAAP_OWNER = config['dmaap']['owner'] + + if 'protocol' in config['dmaap']: + DMAAP_PROTOCOL = config['dmaap']['protocol'] + else: + DMAAP_PROTOCOL = 'https' # Default to https (service discovery should give us this but doesn't + + if 'path' in config['dmaap']: + DMAAP_PATH = config['dmaap']['path'] + else: + DMAAP_PATH = 'webapi' # Should come from service discovery but Consul doesn't support it + + service_address, service_port = _ch.get_service(DBC_SERVICE_NAME) + + DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH) + + +def test_add_entry(mockconsul): + _ch = ConsulHandle("http://{0}:{1}".format(CONSUL_HOST, CONSUL_PORT), None, None, None) + + key = 'DMAAP_TEST' + name = 'dmaap_test_name' + value = 'dmaap_test_value' + _ch.add_to_entry(key, name, value) + + name = "dmaap_test_name_2" + value = 'dmaap_test_value_2' + _ch.add_to_entry(key, name, value) + + _ch.delete_entry(key) diff --git a/dmaap/tests/test_dmaapcontrollerif.py b/dmaap/tests/test_dmaapcontrollerif.py new file mode 100644 index 0000000..25ddb88 --- /dev/null +++ b/dmaap/tests/test_dmaapcontrollerif.py @@ -0,0 +1,113 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +import pytest +import requests +from cloudify.mocks import MockCloudifyContext +from cloudify.state import current_ctx +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError + + +import test_consulif +from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle + +import logging +logger = logging.getLogger("test_mr_lifecycle") + +_goodosv2 = { + 'auth_url': 'https://example.com/identity/v2.0', + 'password': 'pw', + 'region': 'r', + 'tenant_name': 'tn', + 'username': 'un' +} + + +def test_dmaapc (monkeypatch, mockconsul, mockdmaapbc): + from dmaapplugin.dmaaputils import random_string + + config = mockconsul().get_config('mockkey')['dmaap'] + DMAAP_API_URL = config['url'] + DMAAP_USER = config['username'] + DMAAP_PASS = config['password'] + DMAAP_OWNER = config['owner'] + + properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2 } + mock_ctx = MockCloudifyContext( + node_id='test_node_id', + node_name='test_node_name', + properties=properties, + runtime_properties = { + "admin": { "user": "admin_user" }, + "user": { "user": "user_user" }, + "viewer": { "user": "viewer_user" } + }) + + current_ctx.set(mock_ctx) + + kwargs = { "topic_name": "ONAP_test", + "topic_description": "onap dmaap plugin unit test topic"} + + # Make sure there's a topic_name + if "topic_name" in ctx.node.properties: + topic_name = ctx.node.properties["topic_name"] + if topic_name == '' or topic_name.isspace(): + topic_name = random_string(12) + else: + topic_name = random_string(12) + + # Make sure there's a topic description + if "topic_description" in ctx.node.properties: + topic_description = ctx.node.properties["topic_description"] + else: + topic_description = "No description provided" + + # ..and the truly optional setting + if "txenable" in ctx.node.properties: + txenable = ctx.node.properties["txenable"] + else: + txenable= False + + if "replication_case" in ctx.node.properties: + replication_case = ctx.node.properties["replication_case"] + else: + replication_case = None + + if "global_mr_url" in ctx.node.properties: + global_mr_url = ctx.node.properties["global_mr_url"] + else: + global_mr_url = None + + dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger) + ctx.logger.info("Attempting to create topic name {0}".format(topic_name)) + t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url) + + # Capture important properties from the result + topic = t.json() + ctx.instance.runtime_properties["fqtn"] = topic["fqtn"] + + # test DMaaPControllerHandle functions + path = "myPath" + url = dmc._make_url(path) + rc = dmc._get_resource(path) + rc = dmc._create_resource(path, None) + rc = dmc._delete_resource(path) diff --git a/dmaap/tests/test_dr_lifecycle.py b/dmaap/tests/test_dr_lifecycle.py new file mode 100644 index 0000000..2aa65e8 --- /dev/null +++ b/dmaap/tests/test_dr_lifecycle.py @@ -0,0 +1,65 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +import pytest +import requests +from cloudify.mocks import MockCloudifyContext +from cloudify.state import current_ctx +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from cloudify.exceptions import RecoverableError + +_goodosv2 = { + 'auth_url': 'https://example.com/identity/v2.0', + 'password': 'pw', + 'region': 'r', + 'tenant_name': 'tn', + 'username': 'un' +} + + +def test_create_feed(monkeypatch, mockconsul, mockdmaapbc): + import dmaapplugin + from dmaapplugin import dr_lifecycle + + properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2, 'feed_id': 'test_feed_id' } + mock_ctx = MockCloudifyContext( + node_id='test_node_id', + node_name='test_node_name', + properties=properties, + runtime_properties = { + "admin": { "user": "admin_user" }, + "user": { "user": "user_user" }, + "viewer": { "user": "viewer_user" } + }) + + current_ctx.set(mock_ctx) + + kwargs = { "feed_name": "ONAP_test", + "feed_description": "onap dmaap plugin unit test feed"} + + def fake_feed(self): + return {"feedId":"test_feedId", "publishURL":"test_publishURL", "logURL":"test_logURL" } + monkeypatch.setattr(requests.Response, "json", fake_feed) + + dr_lifecycle.create_feed(**kwargs) + dr_lifecycle.get_existing_feed(**kwargs) + dr_lifecycle.delete_feed(**kwargs) diff --git a/dmaap/tests/test_mr_lifecycle.py b/dmaap/tests/test_mr_lifecycle.py new file mode 100644 index 0000000..4a6a583 --- /dev/null +++ b/dmaap/tests/test_mr_lifecycle.py @@ -0,0 +1,59 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +import pytest +import requests +from cloudify.mocks import MockCloudifyContext +from cloudify.state import current_ctx +from cloudify import ctx +from cloudify.decorators import operation +from cloudify.exceptions import NonRecoverableError +from cloudify.exceptions import RecoverableError + +_goodosv2 = { + 'auth_url': 'https://example.com/identity/v2.0', + 'password': 'pw', + 'region': 'r', + 'tenant_name': 'tn', + 'username': 'un' +} + + +def test_create_topic(monkeypatch, mockconsul, mockdmaapbc): + import dmaapplugin + from dmaapplugin import mr_lifecycle + properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2, 'fqtn': 'test_fqtn' } + mock_ctx = MockCloudifyContext( + node_id='test_node_id', + node_name='test_node_name', + properties=properties, + runtime_properties = { + "admin": { "user": "admin_user" }, + "user": { "user": "user_user" }, + "viewer": { "user": "viewer_user" } + }) + + current_ctx.set(mock_ctx) + + kwargs = { "topic_name": "ONAP_test", + "topic_description": "onap dmaap plugin unit test topic"} + + mr_lifecycle.create_topic(**kwargs) + mr_lifecycle.get_existing_topic(**kwargs) + mr_lifecycle.delete_topic(**kwargs) diff --git a/dmaap/tests/test_plugin.py b/dmaap/tests/test_plugin.py new file mode 100644 index 0000000..e2a8586 --- /dev/null +++ b/dmaap/tests/test_plugin.py @@ -0,0 +1,26 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +import pytest +import requests +from cloudify.mocks import MockCloudifyContext +from cloudify.state import current_ctx +from cloudify.exceptions import NonRecoverableError + +def test_noop(): + pass diff --git a/dmaap/tests/test_utils.py b/dmaap/tests/test_utils.py new file mode 100644 index 0000000..362948d --- /dev/null +++ b/dmaap/tests/test_utils.py @@ -0,0 +1,26 @@ +# ============LICENSE_START======================================================= +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +import pytest + + +def test_random_string(monkeypatch): + from dmaapplugin import dmaaputils + target_length = 10 + assert len(dmaaputils.random_string(target_length)) == target_length diff --git a/dmaap/tox.ini b/dmaap/tox.ini new file mode 100644 index 0000000..5bcede7 --- /dev/null +++ b/dmaap/tox.ini @@ -0,0 +1,36 @@ +# ============LICENSE_START==================================================== +# org.onap.dcaegen2 +# ============================================================================= +# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. +# Copyright (c) 2020 Pantheon.tech. All rights reserved. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END====================================================== + +[tox] +envlist = py27,py36,py37,py38 +skip_missing_interpreters = true + +[testenv] +deps= + pytest + coverage + pytest-cov + -rrequirements.txt +setenv = + PYTHONPATH={toxinidir} +commands= + pytest --junitxml xunit-results.xml --cov dmaapcontrollerif --cov consulif --cov dmaapplugin --cov-report xml + coverage xml + coverage report + coverage html -- cgit 1.2.3-korg