diff options
Diffstat (limited to 'python-discovery-client/discovery_client/discovery.py')
-rw-r--r-- | python-discovery-client/discovery_client/discovery.py | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/python-discovery-client/discovery_client/discovery.py b/python-discovery-client/discovery_client/discovery.py new file mode 100644 index 0000000..180e933 --- /dev/null +++ b/python-discovery-client/discovery_client/discovery.py @@ -0,0 +1,368 @@ +# org.onap.dcae +# ================================================================================ +# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. + +import time, json, os, re, logging +from itertools import chain +from functools import partial +import requests +import consul +import six +from discovery_client import util + + +_logger = util.get_logger(__name__) + +class DiscoveryInitError(RuntimeError): + pass + +class DiscoveryRegistrationError(RuntimeError): + pass + +class DiscoveryResolvingNameError(RuntimeError): + pass + + +##### +# Consul calls for services +##### + +def _get_configuration_from_consul(consul_handle, service_name): + index = None + while True: + index, data = consul_handle.kv.get(service_name, index=index) + + if data: + return json.loads(data["Value"].decode("utf-8")) + else: + _logger.warn("No configuration found for {0}. Try again in a bit." + .format(service_name)) + time.sleep(5) + +def _get_relationships_from_consul(consul_handle, service_name): + """Fetch the relationship information from Consul for a service by service + name. Returns a list of service names.""" + index = None + rel_key = "{0}:rel".format(service_name) + while True: + index, data = consul_handle.kv.get(rel_key, index=index) + + if data: + return json.loads(data["Value"].decode("utf-8")) + else: + _logger.warn("No relationships found for {0}. Try again in a bit." + .format(service_name)) + time.sleep(5) + +def _lookup_with_consul(consul_handle, service_name, max_attempts=0): + num_attempts = 1 + + while True: + index, results = consul_handle.catalog.service(service_name) + + if results: + return results + else: + num_attempts += 1 + + if max_attempts > 0 and max_attempts < num_attempts: + return None + + _logger.warn("Service not found {0}. Trying again in a bit." + .format(service_name)) + time.sleep(5) + +def _register_with_consul(consul_handle, service_name, service_ip, service_port, + health_endpoint): + # https://www.consul.io/docs/agent/http/agent.html#agent_service_register + # Note: Unhealthy services should not return in queries i.e. + # dig @127.0.0.1 -p 8600 foo.service.consul + health_url = "http://{0}:{1}/{2}".format(service_ip, service_port, health_endpoint) + return consul_handle.agent.service.register(service_name, address=service_ip, + port=service_port, check= { "HTTP": health_url, "Interval": "5s" }) + +##### +# Config binding service call +##### + +def _get_configuration_resolved_from_cbs(consul_handle, service_name): + """ + This is what a minimal python client library that wraps the CBS would look like. + POSSIBLE TODO: break this out into pypi repo + + This call does not raise an exception if Consul or the CBS cannot complete the request. + It logs an error and returns {} if the config is not bindable. + It could be a temporary network outage. Call me again later. + + It will raise an exception if the necessary env parameters were not set because that is irrecoverable. + This function is called in my /heatlhcheck, so this will be caught early. + """ + config = {} + + results = _lookup_with_consul(consul_handle, "config_binding_service", + max_attempts=5) + + if results is None: + logger.error("Cannot bind config at this time, cbs is unreachable") + else: + cbs_hostname = results[0]["ServiceAddress"] + cbs_port = results[0]["ServicePort"] + cbs_url = "http://{hostname}:{port}".format(hostname=cbs_hostname, port=cbs_port) + + #get my config + my_config_endpoint = "{0}/service_component/{1}".format(cbs_url, + service_name) + res = requests.get(my_config_endpoint) + try: + res.raise_for_status() + config = res.json() + _logger.info("get_config returned the following configuration: {0}".format(json.dumps(config))) + except: + _logger.error("in get_config, the config binding service endpoint {0} blew up on me. Error code: {1}, Error text: {2}".format(my_config_endpoint, res.status_code, res.text)) + return config + +##### +# Functionality for putting together service's configuration +##### + +def _get_connection_types(config): + """Get all the connection types for a given configuration json + + Crawls through the entire config dict recursively and returns the entries + that have been identified as service connections in the form of a list of tuples - + + [(config key, component type), ..] + + where "config key" is a compound key in the form of a tuple. Each entry in + the compound key is a key to a level within the json data structure.""" + def grab_component_type(v): + # To support Python2, unicode strings are not type `str`. Specifically, + # the config string values from Consul maybe encoded to utf-8 so better + # be prepared. + if isinstance(v, six.string_types): + # Regex matches on strings like "{{foo}}" and "{{ BAR }}" and + # extracts the alphanumeric string inside the parantheses. + result = re.match("^{{\s*([-_.\w]*)\s*}}", v) + return result.group(1) if result else None + + def crawl(config, parent_key=()): + if isinstance(config, dict): + rels = [ crawl(value, parent_key + (key, )) + for key, value in config.items() ] + rels = chain(*rels) + elif isinstance(config, list): + rels = [ crawl(config[index], parent_key + (index, )) + for index in range(0, len(config)) ] + rels = chain(*rels) + else: + rels = [(parent_key, grab_component_type(config))] + + # Filter out the entries with Nones + rels = [(key, rel) for key, rel in rels if rel] + return rels + + return crawl(config) + +def _has_connections(config): + return True if _get_connection_types(config) else False + +def _resolve_connection_types(service_name, connection_types, relationships): + + def find_match(connection_type): + ret_list = [] + for rel in relationships: + if connection_type in rel: + ret_list.append(rel) + return ret_list + + return [ (key, find_match(connection_type)) + for key, connection_type in connection_types ] + +def _resolve_name(lookup_func, service_name): + """Resolves the service component name to detailed connection information + + Currently this is grouped into two ways: + 1. CDAP applications take a two step approach - call Consul then call the + CDAP broker + 2. All other applications just call Consul to get IP and port + + Args: + ---- + lookup_func: fn(string) -> list of dicts + The function should return a list of dicts that have "ServiceAddress" and + "ServicePort" key value entries + service_name: (string) service name to lookup + + Return depends upon the connection type: + 1. CDAP applications return a dict + 2. All other applications return a string + """ + def handle_result(result): + ip = result["ServiceAddress"] + port = result["ServicePort"] + + if not (ip and port): + raise DiscoveryResolvingNameError( + "Failed to resolve name for {0}: ip, port not set".format(service_name)) + + # TODO: Need a better way to identify CDAP apps. Really need to make this + # better. + if "platform-" in service_name: + return "{0}:{1}".format(ip, port) + elif "cdap" in service_name: + redirectish_url = "http://{0}:{1}/application/{2}".format(ip, port, + service_name) + + r = requests.get(redirectish_url) + r.raise_for_status() + details = r.json() + # Pick out the details to expose to the component developers + return { key: details[key] + for key in ["connectionurl", "serviceendpoints"] } + else: + return "{0}:{1}".format(ip, port) + + try: + results = lookup_func(service_name) + return [ handle_result(result) for result in results ] + except Exception as e: + raise DiscoveryResolvingNameError( + "Failed to resolve name for {0}: {1}".format(service_name, e)) + +def _resolve_configuration_dict(ch, service_name, config): + """ + Helper used by both resolve_configuration_dict and get_configuration + """ + if _has_connections(config): + rels = _get_relationships_from_consul(ch, service_name) + connection_types = _get_connection_types(config) + connection_names = _resolve_connection_types(service_name, connection_types, rels) + # NOTE: The hardcoded use of the first element. This is to keep things backwards + # compatible since resolve name now returns a list. + for key, conn in [(key, [_resolve_name(partial(_lookup_with_consul, ch), name)[0] for name in names]) for key, names in connection_names]: + config = util.update_json(config, key, conn) + + _logger.info("Generated config: {0}".format(config)) + return config + +##### +# Public calls +##### + +def get_consul_hostname(consul_hostname_override=None): + """Get the Consul hostname""" + try: + return consul_hostname_override \ + if consul_hostname_override else os.environ["CONSUL_HOST"] + except: + raise DiscoveryInitError("CONSUL_HOST variable has not been set!") + +def get_service_name(): + """Get the full service name + + This is expected to be given from whatever entity is starting this service + and given by an environment variable called "HOSTNAME".""" + try: + return os.environ["HOSTNAME"] + except: + raise DiscoveryInitError("HOSTNAME variable has not been set!") + + +def resolve_name(consul_host, service_name, max_attempts=3): + """Resolve the service name + + Do a service discovery lookup from Consul and return back the detailed connection + information. + + Returns: + -------- + For CDAP apps, returns a dict. All others a string with the format "<ip>:<port>" + """ + ch = consul.Consul(host=consul_host) + lookup_func = partial(_lookup_with_consul, ch, max_attempts=max_attempts) + return _resolve_name(lookup_func, service_name) + + +def resolve_configuration_dict(consul_host, service_name, config): + """ + Utility method for taking a given service_name, and config dict, and resolving it + """ + ch = consul.Consul(host=consul_host) + return _resolve_configuration_dict(ch, service_name, config) + + +def get_configuration(override_consul_hostname=None, override_service_name=None, + from_cbs=True): + """Provides this service component's configuration information fully resolved + + This method can either resolve the configuration locally here or make a + remote call to the config binding service. The default is to use the config + binding service. + + Args: + ----- + override_consul_hostname (string): Consul hostname to use rather than the one + set by the environment variable CONSUL_HOST + override_service_name (string): Use this name over the name set on the + HOSTNAME environment variable. Default is None. + from_cbs (boolean): True (default) means use the config binding service otherwise + set to False to have the config pulled and resolved by this library + + Returns the fully resolved service component configuration as a dict + """ + # Get config, bootstrap + consul_hostname = get_consul_hostname(override_consul_hostname) + # NOTE: We use the default port 8500 + ch = consul.Consul(host=consul_hostname) + service_name = override_service_name if override_service_name else get_service_name() + _logger.info("service name: {0}".format(service_name)) + + if from_cbs: + return _get_configuration_resolved_from_cbs(ch, service_name) + else: + # The following will happen: + # + # 1. Fetching the configuration by service component name from Consul + # 2. Fetching the relationships for this service component by service component + # name + # 3. Pick out the connection types from the templetized fields in the configuration + # 4. Resolve the connection types with connection names using the step #2 + # information + # 5. Resolve the connection names with the actual connection via queries to + # Consul using the connection name + config = _get_configuration_from_consul(ch, service_name) + return _resolve_configuration_dict(ch, service_name, config) + + +def register_for_discovery(consul_host, service_ip, service_port): + """Register the service component for service discovery + + This is required in order for other services to "discover" you so that you + can service their requests. + + NOTE: Applications may not need to make this call depending upon if the + environment is using Registrator. + """ + ch = consul.Consul(host=consul_host) + service_name = get_service_name() + + if _register_with_consul(ch, service_name, service_ip, service_port, "health"): + _logger.info("Registered to consul: {0}".format(service_name)) + else: + _logger.error("Failed to register to consul: {0}".format(service_name)) + raise DiscoveryRegistrationError() |