# org.onap.dcae # ================================================================================ # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END========================================================= # # ECOMP is a trademark and service mark of AT&T Intellectual Property. import time, json, os, re, logging from itertools import chain from functools import partial import requests import consul import six from discovery_client import util _logger = util.get_logger(__name__) class DiscoveryInitError(RuntimeError): pass class DiscoveryRegistrationError(RuntimeError): pass class DiscoveryResolvingNameError(RuntimeError): pass ##### # Consul calls for services ##### def _get_configuration_from_consul(consul_handle, service_name): index = None while True: index, data = consul_handle.kv.get(service_name, index=index) if data: return json.loads(data["Value"].decode("utf-8")) else: _logger.warn("No configuration found for {0}. Try again in a bit." .format(service_name)) time.sleep(5) def _get_relationships_from_consul(consul_handle, service_name): """Fetch the relationship information from Consul for a service by service name. Returns a list of service names.""" index = None rel_key = "{0}:rel".format(service_name) while True: index, data = consul_handle.kv.get(rel_key, index=index) if data: return json.loads(data["Value"].decode("utf-8")) else: _logger.warn("No relationships found for {0}. Try again in a bit." .format(service_name)) time.sleep(5) def _lookup_with_consul(consul_handle, service_name, max_attempts=0): num_attempts = 1 while True: index, results = consul_handle.catalog.service(service_name) if results: return results else: num_attempts += 1 if max_attempts > 0 and max_attempts < num_attempts: return None _logger.warn("Service not found {0}. Trying again in a bit." .format(service_name)) time.sleep(5) def _register_with_consul(consul_handle, service_name, service_ip, service_port, health_endpoint): # https://www.consul.io/docs/agent/http/agent.html#agent_service_register # Note: Unhealthy services should not return in queries i.e. # dig @127.0.0.1 -p 8600 foo.service.consul health_url = "http://{0}:{1}/{2}".format(service_ip, service_port, health_endpoint) return consul_handle.agent.service.register(service_name, address=service_ip, port=service_port, check= { "HTTP": health_url, "Interval": "5s" }) ##### # Config binding service call ##### def _get_configuration_resolved_from_cbs(consul_handle, service_name): """ This is what a minimal python client library that wraps the CBS would look like. POSSIBLE TODO: break this out into pypi repo This call does not raise an exception if Consul or the CBS cannot complete the request. It logs an error and returns {} if the config is not bindable. It could be a temporary network outage. Call me again later. It will raise an exception if the necessary env parameters were not set because that is irrecoverable. This function is called in my /heatlhcheck, so this will be caught early. """ config = {} results = _lookup_with_consul(consul_handle, "config_binding_service", max_attempts=5) if results is None: logger.error("Cannot bind config at this time, cbs is unreachable") else: cbs_hostname = results[0]["ServiceAddress"] cbs_port = results[0]["ServicePort"] cbs_url = "http://{hostname}:{port}".format(hostname=cbs_hostname, port=cbs_port) #get my config my_config_endpoint = "{0}/service_component/{1}".format(cbs_url, service_name) res = requests.get(my_config_endpoint) try: res.raise_for_status() config = res.json() _logger.info("get_config returned the following configuration: {0}".format(json.dumps(config))) except: _logger.error("in get_config, the config binding service endpoint {0} blew up on me. Error code: {1}, Error text: {2}".format(my_config_endpoint, res.status_code, res.text)) return config ##### # Functionality for putting together service's configuration ##### def _get_connection_types(config): """Get all the connection types for a given configuration json Crawls through the entire config dict recursively and returns the entries that have been identified as service connections in the form of a list of tuples - [(config key, component type), ..] where "config key" is a compound key in the form of a tuple. Each entry in the compound key is a key to a level within the json data structure.""" def grab_component_type(v): # To support Python2, unicode strings are not type `str`. Specifically, # the config string values from Consul maybe encoded to utf-8 so better # be prepared. if isinstance(v, six.string_types): # Regex matches on strings like "{{foo}}" and "{{ BAR }}" and # extracts the alphanumeric string inside the parantheses. result = re.match("^{{\s*([-_.\w]*)\s*}}", v) return result.group(1) if result else None def crawl(config, parent_key=()): if isinstance(config, dict): rels = [ crawl(value, parent_key + (key, )) for key, value in config.items() ] rels = chain(*rels) elif isinstance(config, list): rels = [ crawl(config[index], parent_key + (index, )) for index in range(0, len(config)) ] rels = chain(*rels) else: rels = [(parent_key, grab_component_type(config))] # Filter out the entries with Nones rels = [(key, rel) for key, rel in rels if rel] return rels return crawl(config) def _has_connections(config): return True if _get_connection_types(config) else False def _resolve_connection_types(service_name, connection_types, relationships): def find_match(connection_type): ret_list = [] for rel in relationships: if connection_type in rel: ret_list.append(rel) return ret_list return [ (key, find_match(connection_type)) for key, connection_type in connection_types ] def _resolve_name(lookup_func, service_name): """Resolves the service component name to detailed connection information Currently this is grouped into two ways: 1. CDAP applications take a two step approach - call Consul then call the CDAP broker 2. All other applications just call Consul to get IP and port Args: ---- lookup_func: fn(string) -> list of dicts The function should return a list of dicts that have "ServiceAddress" and "ServicePort" key value entries service_name: (string) service name to lookup Return depends upon the connection type: 1. CDAP applications return a dict 2. All other applications return a string """ def handle_result(result): ip = result["ServiceAddress"] port = result["ServicePort"] if not (ip and port): raise DiscoveryResolvingNameError( "Failed to resolve name for {0}: ip, port not set".format(service_name)) # TODO: Need a better way to identify CDAP apps. Really need to make this # better. if "cdap" in service_name: redirectish_url = "http://{0}:{1}/application/{2}".format(ip, port, service_name) r = requests.get(redirectish_url) r.raise_for_status() details = r.json() # Pick out the details to expose to the component developers return { key: details[key] for key in ["connectionurl", "serviceendpoints"] } else: return "{0}:{1}".format(ip, port) try: results = lookup_func(service_name) return [ handle_result(result) for result in results ] except Exception as e: raise DiscoveryResolvingNameError( "Failed to resolve name for {0}: {1}".format(service_name, e)) def _resolve_configuration_dict(ch, service_name, config): """ Helper used by both resolve_configuration_dict and get_configuration """ if _has_connections(config): rels = _get_relationships_from_consul(ch, service_name) connection_types = _get_connection_types(config) connection_names = _resolve_connection_types(service_name, connection_types, rels) # NOTE: The hardcoded use of the first element. This is to keep things backwards # compatible since resolve name now returns a list. for key, conn in [(key, [_resolve_name(partial(_lookup_with_consul, ch), name)[0] for name in names]) for key, names in connection_names]: config = util.update_json(config, key, conn) _logger.info("Generated config: {0}".format(config)) return config ##### # Public calls ##### def get_consul_hostname(consul_hostname_override=None): """Get the Consul hostname""" try: return consul_hostname_override \ if consul_hostname_override else os.environ["CONSUL_HOST"] except: raise DiscoveryInitError("CONSUL_HOST variable has not been set!") def get_service_name(): """Get the full service name This is expected to be given from whatever entity is starting this service and given by an environment variable called "HOSTNAME".""" try: return os.environ["HOSTNAME"] except: raise DiscoveryInitError("HOSTNAME variable has not been set!") def resolve_name(consul_host, service_name, max_attempts=3): """Resolve the service name Do a service discovery lookup from Consul and return back the detailed connection information. Returns: -------- For CDAP apps, returns a dict. All others a string with the format ":" """ ch = consul.Consul(host=consul_host) lookup_func = partial(_lookup_with_consul, ch, max_attempts=max_attempts) return _resolve_name(lookup_func, service_name) def resolve_configuration_dict(consul_host, service_name, config): """ Utility method for taking a given service_name, and config dict, and resolving it """ ch = consul.Consul(host=consul_host) return _resolve_configuration_dict(ch, service_name, config) def get_configuration(override_consul_hostname=None, override_service_name=None, from_cbs=True): """Provides this service component's configuration information fully resolved This method can either resolve the configuration locally here or make a remote call to the config binding service. The default is to use the config binding service. Args: ----- override_consul_hostname (string): Consul hostname to use rather than the one set by the environment variable CONSUL_HOST override_service_name (string): Use this name over the name set on the HOSTNAME environment variable. Default is None. from_cbs (boolean): True (default) means use the config binding service otherwise set to False to have the config pulled and resolved by this library Returns the fully resolved service component configuration as a dict """ # Get config, bootstrap consul_hostname = get_consul_hostname(override_consul_hostname) # NOTE: We use the default port 8500 ch = consul.Consul(host=consul_hostname) service_name = override_service_name if override_service_name else get_service_name() _logger.info("service name: {0}".format(service_name)) if from_cbs: return _get_configuration_resolved_from_cbs(ch, service_name) else: # The following will happen: # # 1. Fetching the configuration by service component name from Consul # 2. Fetching the relationships for this service component by service component # name # 3. Pick out the connection types from the templetized fields in the configuration # 4. Resolve the connection types with connection names using the step #2 # information # 5. Resolve the connection names with the actual connection via queries to # Consul using the connection name config = _get_configuration_from_consul(ch, service_name) return _resolve_configuration_dict(ch, service_name, config) def register_for_discovery(consul_host, service_ip, service_port): """Register the service component for service discovery This is required in order for other services to "discover" you so that you can service their requests. NOTE: Applications may not need to make this call depending upon if the environment is using Registrator. """ ch = consul.Consul(host=consul_host) service_name = get_service_name() if _register_with_consul(ch, service_name, service_ip, service_port, "health"): _logger.info("Registered to consul: {0}".format(service_name)) else: _logger.error("Failed to register to consul: {0}".format(service_name)) raise DiscoveryRegistrationError()