summaryrefslogtreecommitdiffstats
path: root/dmaap/dmaapplugin/mr_relationships.py
diff options
context:
space:
mode:
Diffstat (limited to 'dmaap/dmaapplugin/mr_relationships.py')
-rw-r--r--dmaap/dmaapplugin/mr_relationships.py119
1 files changed, 119 insertions, 0 deletions
diff --git a/dmaap/dmaapplugin/mr_relationships.py b/dmaap/dmaapplugin/mr_relationships.py
new file mode 100644
index 0000000..34d02e2
--- /dev/null
+++ b/dmaap/dmaapplugin/mr_relationships.py
@@ -0,0 +1,119 @@
+# ============LICENSE_START====================================================
+# org.onap.dcaegen2
+# =============================================================================
+# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+from cloudify import ctx
+from cloudify.decorators import operation
+from cloudify.exceptions import NonRecoverableError
+from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST
+from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+from consulif.consulif import ConsulHandle
+
+# Message router relationship operations
+
+def _add_mr_client(ctype, actions):
+ '''
+ Adds the node represented by 'source' as a client (publisher or subscriber) to
+ to topic represented by the 'target' node. The list of actions in 'actions'
+ determines whether the client is a subscriber or a publisher.
+
+ Assumes target (the topic) has the following runtime property set
+ - fqtn
+ Assumes source (the client) has a runtime property whose name matches the node name of the feed.
+ This is a dictionary containing the following properties:
+ - location (the dcaeLocationName to pass when adding the client to the topic)
+ - client_role (the AAF client role under which the client will access the topic)
+ Adds two properties to the dictionary above:
+ - topic_url (the URL that the client can use to access the topic)
+ - client_id (used to delete the client in the uninstall workflow)
+ '''
+ try:
+ # Make sure we have a name under which to store DMaaP configuration
+ # Check early so we don't needlessly create DMaaP entities
+ if 'service_component_name' not in ctx.source.instance.runtime_properties:
+ raise Exception("Source node does not have 'service_component_name' in runtime_properties")
+
+ target_topic = ctx.target.node.id # Key for the source's dictionary with topic-related info
+ fqtn = ctx.target.instance.runtime_properties["fqtn"]
+ ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn))
+
+ # Get the parameters needed for adding the client
+ location = ctx.source.instance.runtime_properties[target_topic]["location"]
+ client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"]
+
+ # Make the request to add the client to the topic
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ c = dmc.create_client(fqtn, location, client_role, actions)
+ c.raise_for_status()
+ client_info = c.json()
+ client_id = client_info["mrClientId"]
+ topic_url = client_info["topicURL"]
+
+ # Update source's runtime properties
+ #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url
+ #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id
+ ctx.source.instance.runtime_properties[target_topic] = {
+ "topic_url" : topic_url,
+ "client_id" : client_id,
+ "location" : location,
+ "client_role" : client_role
+ }
+
+ ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location))
+
+ # Set key in Consul
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic])
+
+ except Exception as e:
+ ctx.logger.error("Error adding client to feed: {er}".format(er=e))
+ raise NonRecoverableError(e)
+
+@operation
+def add_mr_publisher(**kwargs):
+ _add_mr_client("publisher", ["view", "pub"])
+
+@operation
+def add_mr_subscriber(**kwargs):
+ _add_mr_client("subscriber", ["view", "sub"])
+
+@operation
+def delete_mr_client(**kwargs):
+ '''
+ Delete the client (publisher or subscriber).
+ Expect property 'client_id' to have been set in the instance's runtime_properties
+ when the client was created.
+ '''
+ try:
+ target_topic = ctx.target.node.id
+ client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"]
+ ctx.logger.info("Attempting to delete client {0} ".format(client_id))
+ dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ c = dmc.delete_client(client_id)
+ c.raise_for_status()
+
+ ctx.logger.info("Deleted client {0}".format(client_id))
+
+ # Attempt to remove the entire ":dmaap" entry from the Consul KV store
+ # Will quietly do nothing if the entry has already been removed
+ ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
+ ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
+
+ except Exception as e:
+ ctx.logger.error("Error deleting MR client: {er}".format(er=e))
+ # don't raise a NonRecoverable error here--let the uninstall workflow continue
+