summaryrefslogtreecommitdiffstats
path: root/dmaap/dmaapplugin/dr_relationships.py
blob: eff0fa1a15fc11efa6d0d2c261ab28c433bea1e4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# ============LICENSE_START====================================================
# org.onap.ccsdk
# =============================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
# =============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END======================================================

from cloudify import ctx
from cloudify.decorators import operation
from cloudify.exceptions import NonRecoverableError
from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST
from dmaaputils import random_string
from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
from consulif.consulif import ConsulHandle

# Lifecycle operations for DMaaP Data Router
# publish and subscribe relationships

@operation
def add_dr_publisher(**kwargs):
    '''
    Sets up the source of the publishes_relationship as a publisher to the feed that
    is the target of the relationship
        Assumes target (the feed) has the following runtime properties set
            - feed_id
            - log_url
            - publish_url
        Assumes source (the publisher) has a runtime property whose name matches the node name of the feed.
        This is a dictionary containing one property:
            - location   (the dcaeLocationName to pass when adding the publisher to the feed)
        Generates a user name and password that the publisher will need to use when publishing
        Adds the following properties to the dictionary above:
             - publish_url
             - log_url
             - username
             - password
    '''
    try:
        # Make sure we have a name under which to store DMaaP configuration
        # Check early so we don't needlessly create DMaaP entities
        if 'service_component_name' not in ctx.source.instance.runtime_properties:
            raise Exception("Source node does not have 'service_component_name' in runtime_properties")

        target_feed = ctx.target.node.id
        ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed))

        # Set up the parameters for the add_publisher request to the DMaaP bus controller
        feed_id = ctx.target.instance.runtime_properties["feed_id"]
        location = ctx.source.instance.runtime_properties[target_feed]["location"]
        username = random_string(8)
        password = random_string(16)

        # Make the request to add the publisher to the feed
        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
        add_pub = dmc.add_publisher(feed_id, location, username, password)
        add_pub.raise_for_status()
        publisher_info = add_pub.json()
        publisher_id = publisher_info["pubId"]
        ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password))

        # Set runtime properties on the source
        ctx.source.instance.runtime_properties[target_feed] = {
           "publisher_id" : publisher_id,
           "location" : location,
           "publish_url" : ctx.target.instance.runtime_properties["publish_url"],
           "log_url" : ctx.target.instance.runtime_properties["log_url"],
           "username" : username,
           "password" : password
        }

        # Set key in Consul
        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
        cpy = dict(ctx.source.instance.runtime_properties[target_feed])
        # cpy["password"] = pkcrypto.encrypt_string(cpy["password"])  # can't encrypt until collectors can decrypt
        ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)

    except Exception as e:
        ctx.logger.error("Error adding publisher to feed: {er}".format(er=e))
        raise NonRecoverableError(e)


@operation
def delete_dr_publisher(**kwargs):
    '''
    Deletes publisher (the source of the publishes_files relationship)
    from the feed (the target of the relationship).
    Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties,
    when the publisher was added to the feed.
    '''

    try:
        # Make sure we have a name under which to store DMaaP configuration
        # Check early so we don't needlessly create DMaaP entities
        if 'service_component_name' not in ctx.source.instance.runtime_properties:
            raise Exception("Source node does not have 'service_component_name' in runtime_properties")

        # Get the publisher id
        target_feed = ctx.target.node.id
        publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"]
        ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id, target_feed))

        # Make the request
        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
        del_result = dmc.delete_publisher(publisher_id)
        del_result.raise_for_status()

        ctx.logger.info("Deleted publisher {0}".format(publisher_id))

        # Attempt to remove the entire ":dmaap" entry from the Consul KV store
        # Will quietly do nothing if the entry has already been removed
        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
        ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))

    except Exception as e:
        ctx.logger.error("Error deleting publisher: {er}".format(er=e))
        # don't raise a NonRecoverable error here--let the uninstall workflow continue


@operation
def add_dr_subscriber(**kwargs):
    '''
    Sets up the source of the subscribes_to_files relationship as a subscriber to the
    feed that is the target of the relationship.
    Assumes target (the feed) has the following runtime property set
        - feed_id
    Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed.
    This is a dictionary containing the following properties:
        - location   (the dcaeLocationName to pass when adding the publisher to the feed)
        - delivery_url (the URL to which data router will deliver files)
        - username (the username data router will use when delivering files)
        - password (the password data router will use when delivering files)
    Adds a property to the dictionary above:
        - subscriber_id  (used to delete the subscriber in the uninstall workflow
    '''
    try:
        target_feed = ctx.target.node.id
        ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed))

        # Get the parameters for the call
        feed_id = ctx.target.instance.runtime_properties["feed_id"]
        location = ctx.source.instance.runtime_properties[target_feed]["location"]
        delivery_url = ctx.source.instance.runtime_properties[target_feed]["delivery_url"]
        username = ctx.source.instance.runtime_properties[target_feed]["username"]
        password = ctx.source.instance.runtime_properties[target_feed]["password"]

        # Make the request to add the subscriber to the feed
        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
        add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password)
        add_sub.raise_for_status()
        subscriber_info = add_sub.json()
        subscriber_id = subscriber_info["subId"]
        ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location))

        # Add subscriber_id to the runtime properties
        # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id
        ctx.source.instance.runtime_properties[target_feed] = {
            "subscriber_id": subscriber_id,
            "location" : location,
            "delivery_url" : delivery_url,
            "username" : username,
            "password" : password
        }
        ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed]))

        # Set key in Consul
        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
        cpy = dict(ctx.source.instance.runtime_properties[target_feed])
        # cpy["password"] = pkcrypto.encrypt_string(cpy["password"])  # can't encrypt until collectors can decrypt
        ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)

    except Exception as e:
        ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e))
        raise NonRecoverableError(e)


@operation
def delete_dr_subscriber(**kwargs):
    '''
    Deletes subscriber (the source of the subscribes_to_files relationship)
    from the feed (the target of the relationship).
    Assumes that the source node's runtime properties dictionary for the target feed
    includes 'subscriber_id', set when the publisher was added to the feed.
    '''
    try:
        # Get the subscriber id
        target_feed = ctx.target.node.id
        subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"]
        ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed))

        # Make the request
        dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
        del_result = dmc.delete_subscriber(subscriber_id)
        del_result.raise_for_status()

        ctx.logger.info("Deleted subscriber {0}".format(subscriber_id))

        # Attempt to remove the entire ":dmaap" entry from the Consul KV store
        # Will quietly do nothing if the entry has already been removed
        ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
        ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))

    except Exception as e:
        ctx.logger.error("Error deleting subscriber: {er}".format(er=e))
        # don't raise a NonRecoverable error here--let the uninstall workflow continue