summaryrefslogtreecommitdiffstats
path: root/python-discovery-client/discovery_client/discovery.py
blob: 180e933237b321d751cda633937aca7123643557 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# org.onap.dcae
# ================================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.

import time, json, os, re, logging
from itertools import chain
from functools import partial
import requests
import consul
import six
from discovery_client import util


_logger = util.get_logger(__name__)

class DiscoveryInitError(RuntimeError):
    pass

class DiscoveryRegistrationError(RuntimeError):
    pass

class DiscoveryResolvingNameError(RuntimeError):
    pass


#####
# Consul calls for services
#####

def _get_configuration_from_consul(consul_handle, service_name):
    index = None
    while True:
        index, data = consul_handle.kv.get(service_name, index=index)

        if data:
            return json.loads(data["Value"].decode("utf-8"))
        else:
            _logger.warn("No configuration found for {0}. Try again in a bit."
                    .format(service_name))
            time.sleep(5)

def _get_relationships_from_consul(consul_handle, service_name):
    """Fetch the relationship information from Consul for a service by service
    name.  Returns a list of service names."""
    index = None
    rel_key = "{0}:rel".format(service_name)
    while True:
        index, data = consul_handle.kv.get(rel_key, index=index)

        if data:
            return json.loads(data["Value"].decode("utf-8"))
        else:
            _logger.warn("No relationships found for {0}. Try again in a bit."
                    .format(service_name))
            time.sleep(5)

def _lookup_with_consul(consul_handle, service_name, max_attempts=0):
    num_attempts = 1

    while True:
        index, results = consul_handle.catalog.service(service_name)

        if results:
            return results
        else:
            num_attempts += 1

            if max_attempts > 0 and max_attempts < num_attempts:
                return None

            _logger.warn("Service not found {0}. Trying again in a bit."
                    .format(service_name))
            time.sleep(5)

def _register_with_consul(consul_handle, service_name, service_ip, service_port,
        health_endpoint):
    # https://www.consul.io/docs/agent/http/agent.html#agent_service_register
    # Note: Unhealthy services should not return in queries i.e.
    # dig @127.0.0.1 -p 8600 foo.service.consul
    health_url = "http://{0}:{1}/{2}".format(service_ip, service_port, health_endpoint)
    return consul_handle.agent.service.register(service_name, address=service_ip,
            port=service_port, check= { "HTTP": health_url, "Interval": "5s" })

#####
# Config binding service call
#####

def _get_configuration_resolved_from_cbs(consul_handle, service_name):
    """
    This is what a minimal python client library that wraps the CBS would look like.
    POSSIBLE TODO: break this out into pypi repo

    This call does not raise an exception if Consul or the CBS cannot complete the request.
    It logs an error and returns {} if the config is not bindable. 
    It could be a temporary network outage. Call me again later. 

    It will raise an exception if the necessary env parameters were not set because that is irrecoverable.
    This function is called in my /heatlhcheck, so this will be caught early.
    """
    config = {}

    results = _lookup_with_consul(consul_handle, "config_binding_service",
            max_attempts=5)

    if results is None:
        logger.error("Cannot bind config at this time, cbs is unreachable")
    else:
        cbs_hostname = results[0]["ServiceAddress"]
        cbs_port = results[0]["ServicePort"]
        cbs_url = "http://{hostname}:{port}".format(hostname=cbs_hostname, port=cbs_port)

        #get my config
        my_config_endpoint = "{0}/service_component/{1}".format(cbs_url,
                service_name)
        res = requests.get(my_config_endpoint)
        try:
            res.raise_for_status()
            config = res.json()
            _logger.info("get_config returned the following configuration: {0}".format(json.dumps(config)))
        except:
            _logger.error("in get_config, the config binding service endpoint {0} blew up on me. Error code: {1}, Error text: {2}".format(my_config_endpoint, res.status_code, res.text))
    return config

#####
# Functionality for putting together service's configuration
#####

def _get_connection_types(config):
    """Get all the connection types for a given configuration json

    Crawls through the entire config dict recursively and returns the entries
    that have been identified as service connections in the form of a list of tuples -

        [(config key, component type), ..]

    where "config key" is a compound key in the form of a tuple. Each entry in
    the compound key is a key to a level within the json data structure."""
    def grab_component_type(v):
        # To support Python2, unicode strings are not type `str`. Specifically,
        # the config string values from Consul maybe encoded to utf-8 so better
        # be prepared.
        if isinstance(v, six.string_types):
            # Regex matches on strings like "{{foo}}" and "{{ BAR }}" and
            # extracts the alphanumeric string inside the parantheses.
            result = re.match("^{{\s*([-_.\w]*)\s*}}", v)
            return result.group(1) if result else None

    def crawl(config, parent_key=()):
        if isinstance(config, dict):
            rels = [ crawl(value, parent_key + (key, ))
                    for key, value in config.items() ]
            rels = chain(*rels)
        elif isinstance(config, list):
            rels = [ crawl(config[index], parent_key + (index, ))
                    for index in range(0, len(config)) ]
            rels = chain(*rels)
        else:
            rels = [(parent_key, grab_component_type(config))]

        # Filter out the entries with Nones
        rels = [(key, rel) for key, rel in rels if rel]
        return rels

    return crawl(config)

def _has_connections(config):
    return True if _get_connection_types(config) else False

def _resolve_connection_types(service_name, connection_types, relationships):

    def find_match(connection_type):
        ret_list = []
        for rel in relationships:
            if connection_type in rel:
                ret_list.append(rel)
        return ret_list

    return [ (key, find_match(connection_type))
            for key, connection_type in connection_types ]

def _resolve_name(lookup_func, service_name):
    """Resolves the service component name to detailed connection information

    Currently this is grouped into two ways:
    1. CDAP applications take a two step approach - call Consul then call the
    CDAP broker
    2. All other applications just call Consul to get IP and port

    Args:
    ----
    lookup_func: fn(string) -> list of dicts
        The function should return a list of dicts that have "ServiceAddress" and
        "ServicePort" key value entries
    service_name: (string) service name to lookup

    Return depends upon the connection type:
    1. CDAP applications return a dict
    2. All other applications return a string
    """
    def handle_result(result):
        ip = result["ServiceAddress"]
        port = result["ServicePort"]

        if not (ip and port):
            raise DiscoveryResolvingNameError(
                    "Failed to resolve name for {0}: ip, port not set".format(service_name))

        # TODO: Need a better way to identify CDAP apps. Really need to make this
        # better.
        if "platform-" in service_name:
            return "{0}:{1}".format(ip, port)
        elif "cdap" in service_name:
            redirectish_url = "http://{0}:{1}/application/{2}".format(ip, port,
                    service_name)

            r = requests.get(redirectish_url)
            r.raise_for_status()
            details = r.json()
            # Pick out the details to expose to the component developers
            return { key: details[key]
                    for key in ["connectionurl", "serviceendpoints"] }
        else:
            return "{0}:{1}".format(ip, port)

    try:
        results = lookup_func(service_name)
        return [ handle_result(result) for result in results ]
    except Exception as e:
        raise DiscoveryResolvingNameError(
                "Failed to resolve name for {0}: {1}".format(service_name, e))

def _resolve_configuration_dict(ch, service_name, config):
    """
    Helper used by both resolve_configuration_dict and get_configuration
    """
    if _has_connections(config):
        rels = _get_relationships_from_consul(ch, service_name)
        connection_types = _get_connection_types(config)
        connection_names = _resolve_connection_types(service_name, connection_types, rels)
        # NOTE: The hardcoded use of the first element. This is to keep things backwards
        # compatible since resolve name now returns a list.
        for key, conn in [(key, [_resolve_name(partial(_lookup_with_consul, ch), name)[0] for name in names]) for key, names in connection_names]:
            config = util.update_json(config, key, conn)

    _logger.info("Generated config: {0}".format(config))
    return config

#####
# Public calls 
#####

def get_consul_hostname(consul_hostname_override=None):
    """Get the Consul hostname"""
    try:
        return consul_hostname_override \
                if consul_hostname_override else os.environ["CONSUL_HOST"]
    except:
        raise DiscoveryInitError("CONSUL_HOST variable has not been set!")

def get_service_name():
    """Get the full service name

    This is expected to be given from whatever entity is starting this service
    and given by an environment variable called "HOSTNAME"."""
    try:
        return os.environ["HOSTNAME"]
    except:
        raise DiscoveryInitError("HOSTNAME variable has not been set!")


def resolve_name(consul_host, service_name, max_attempts=3):
    """Resolve the service name

    Do a service discovery lookup from Consul and return back the detailed connection
    information.

    Returns:
    --------
    For CDAP apps, returns a dict. All others a string with the format "<ip>:<port>"
    """
    ch = consul.Consul(host=consul_host)
    lookup_func = partial(_lookup_with_consul, ch, max_attempts=max_attempts)
    return _resolve_name(lookup_func, service_name)


def resolve_configuration_dict(consul_host, service_name, config):
    """
    Utility method for taking a given service_name, and config dict, and resolving it
    """
    ch = consul.Consul(host=consul_host)
    return _resolve_configuration_dict(ch, service_name, config)


def get_configuration(override_consul_hostname=None, override_service_name=None,
        from_cbs=True):
    """Provides this service component's configuration information fully resolved

    This method can either resolve the configuration locally here or make a
    remote call to the config binding service. The default is to use the config
    binding service.

    Args:
    -----
    override_consul_hostname (string): Consul hostname to use rather than the one
        set by the environment variable CONSUL_HOST
    override_service_name (string): Use this name over the name set on the
        HOSTNAME environment variable. Default is None.
    from_cbs (boolean): True (default) means use the config binding service otherwise
        set to False to have the config pulled and resolved by this library

    Returns the fully resolved service component configuration as a dict
    """
    # Get config, bootstrap
    consul_hostname = get_consul_hostname(override_consul_hostname)
    # NOTE: We use the default port 8500
    ch = consul.Consul(host=consul_hostname)
    service_name = override_service_name if override_service_name else get_service_name()
    _logger.info("service name: {0}".format(service_name))

    if from_cbs:
        return _get_configuration_resolved_from_cbs(ch, service_name)
    else:
        # The following will happen:
        #
        # 1. Fetching the configuration by service component name from Consul
        # 2. Fetching the relationships for this service component by service component
        # name
        # 3. Pick out the connection types from the templetized fields in the configuration
        # 4. Resolve the connection types with connection names using the step #2
        # information
        # 5. Resolve the connection names with the actual connection via queries to
        # Consul using the connection name
        config = _get_configuration_from_consul(ch, service_name)
        return _resolve_configuration_dict(ch, service_name, config)


def register_for_discovery(consul_host, service_ip, service_port):
    """Register the service component for service discovery

    This is required in order for other services to "discover" you so that you
    can service their requests.

    NOTE: Applications may not need to make this call depending upon if the
    environment is using Registrator.
    """
    ch = consul.Consul(host=consul_host)
    service_name = get_service_name()

    if _register_with_consul(ch, service_name, service_ip, service_port, "health"):
        _logger.info("Registered to consul: {0}".format(service_name))
    else:
        _logger.error("Failed to register to consul: {0}".format(service_name))
        raise DiscoveryRegistrationError()