summaryrefslogtreecommitdiffstats
path: root/dmaap/dmaapplugin/mr_relationships.py
blob: 34d02e2b48c54390d5ef2bc52e2bddd8e4ccb33b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# ============LICENSE_START====================================================
# org.onap.dcaegen2
# =============================================================================
# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# =============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END======================================================

from cloudify import ctx
from cloudify.decorators import operation
from cloudify.exceptions import NonRecoverableError
from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST
from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
from consulif.consulif import ConsulHandle

# Message router relationship operations

def _add_mr_client(ctype, actions):
    '''
    Adds the node represented by 'source' as a client (publisher or subscriber) to
    to topic represented by the 'target' node.  The list of actions in 'actions'
    determines whether the client is a subscriber or a publisher.

    Assumes target (the topic) has the following runtime property set
        - fqtn
    Assumes source (the client) has a runtime property whose name matches the node name of the feed.
    This is a dictionary containing the following properties:
        - location   (the dcaeLocationName to pass when adding the client to the topic)
        - client_role (the AAF client role under which the client will access the topic)
    Adds two properties to the dictionary above:
        - topic_url (the URL that the client can use to access the topic)
        - client_id  (used to delete the client in the uninstall workflow)
    '''
    try:
        # Make sure we have a name under which to store DMaaP configuration
        # Check early so we don't needlessly create DMaaP entities
        if 'service_component_name' not in ctx.source.instance.runtime_properties:
            raise Exception("Source node does not have 'service_component_name' in runtime_properties")

        target_topic = ctx.target.node.id           # Key for the source's dictionary with topic-related info
        fqtn = ctx.target.instance.runtime_properties["fqtn"]
        ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn))

        # Get the parameters needed for adding the client
        location = ctx.source.instance.runtime_properties[target_topic]["location"]
        client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"]

        # Make the request to add the client to the topic
        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
        c = dmc.create_client(fqtn, location, client_role, actions)
        c.raise_for_status()
        client_info = c.json()
        client_id = client_info["mrClientId"]
        topic_url = client_info["topicURL"]

        # Update source's runtime properties
        #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url
        #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id
        ctx.source.instance.runtime_properties[target_topic] = {
            "topic_url" : topic_url,
            "client_id" : client_id,
            "location" : location,
            "client_role" : client_role
        }

        ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location))

        # Set key in Consul
        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
        ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic])

    except Exception as e:
        ctx.logger.error("Error adding client to feed: {er}".format(er=e))
        raise NonRecoverableError(e)

@operation
def add_mr_publisher(**kwargs):
    _add_mr_client("publisher", ["view", "pub"])

@operation
def add_mr_subscriber(**kwargs):
        _add_mr_client("subscriber", ["view", "sub"])

@operation
def delete_mr_client(**kwargs):
    '''
    Delete the client (publisher or subscriber).
    Expect property 'client_id' to have been set in the instance's runtime_properties
    when the client was created.
    '''
    try:
        target_topic = ctx.target.node.id
        client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"]
        ctx.logger.info("Attempting to delete client {0} ".format(client_id))
        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
        c = dmc.delete_client(client_id)
        c.raise_for_status()

        ctx.logger.info("Deleted client {0}".format(client_id))

        # Attempt to remove the entire ":dmaap" entry from the Consul KV store
        # Will quietly do nothing if the entry has already been removed
        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
        ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))

    except Exception as e:
        ctx.logger.error("Error deleting MR client: {er}".format(er=e))
        # don't raise a NonRecoverable error here--let the uninstall workflow continue