summaryrefslogtreecommitdiffstats
path: root/k8s/k8splugin/discovery.py
diff options
context:
space:
mode:
authorJack Lucas <jflucas@research.att.com>2018-03-22 17:43:01 +0000
committerJack Lucas <jflucas@research.att.com>2018-03-23 12:46:10 +0000
commit1ed29c111f50e33ac89915ba86b530820da3a689 (patch)
tree314ad8dc5dd2cad4ba89bac5662015f8a9c43ced /k8s/k8splugin/discovery.py
parent19a075b395d687753de0f0266842cb9af8f24682 (diff)
Add DCAE Kubernetes plugin
Change-Id: Ic329f77bb09dcccfd49a34a8f84d387e8da929c6 Issue-ID: DCAEGEN2-410 Signed-off-by: Jack Lucas <jflucas@research.att.com>
Diffstat (limited to 'k8s/k8splugin/discovery.py')
-rw-r--r--k8s/k8splugin/discovery.py269
1 files changed, 269 insertions, 0 deletions
diff --git a/k8s/k8splugin/discovery.py b/k8s/k8splugin/discovery.py
new file mode 100644
index 0000000..f3b87b6
--- /dev/null
+++ b/k8s/k8splugin/discovery.py
@@ -0,0 +1,269 @@
+# ============LICENSE_START=======================================================
+# org.onap.dcae
+# ================================================================================
+# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+from functools import partial
+import json
+import logging
+import uuid
+import requests
+import consul
+import re
+
+
+logger = logging.getLogger("discovery")
+
+
+class DiscoveryError(RuntimeError):
+ pass
+
+class DiscoveryConnectionError(RuntimeError):
+ pass
+
+class DiscoveryServiceNotFoundError(RuntimeError):
+ pass
+
+class DiscoveryKVEntryNotFoundError(RuntimeError):
+ pass
+
+
+def _wrap_consul_call(consul_func, *args, **kwargs):
+ """Wrap Consul call to map errors"""
+ try:
+ return consul_func(*args, **kwargs)
+ except requests.exceptions.ConnectionError as e:
+ raise DiscoveryConnectionError(e)
+
+
+def generate_service_component_name(service_component_type):
+ """Generate service component id used to pass into the service component
+ instance and used as the key to the service component configuration.
+
+ Updated for use with Kubernetes. Sometimes the service component name gets
+ used in Kubernetes in contexts (such as naming a Kubernetes Service) that
+ requires the name to conform to the RFC1035 DNS "label" syntax:
+ -- starts with an alpha
+ -- contains only of alphanumerics and "-"
+ -- <= 63 characters long
+
+ Format:
+ s<service component id>-<service component type>,
+ truncated to 63 characters, "_" replaced with "-" in service_component_type,
+ other non-conforming characters removed from service_component_type
+ """
+ # Random generated
+ # Copied from cdap plugin
+ sct = re.sub('[^A-Za-z0-9-]','',(service_component_type.replace('_','-')))
+ return ("s{0}-{1}".format(str(uuid.uuid4()).replace("-",""),sct))[:63]
+
+
+def create_kv_conn(host):
+ """Create connection to key-value store
+
+ Returns a Consul client to the specified Consul host"""
+ try:
+ [hostname, port] = host.split(":")
+ return consul.Consul(host=hostname, port=int(port))
+ except ValueError as e:
+ return consul.Consul(host=host)
+
+def push_service_component_config(kv_conn, service_component_name, config):
+ config_string = config if isinstance(config, str) else json.dumps(config)
+ kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put)
+
+ if kv_put_func(service_component_name, config_string):
+ logger.info("Added config for {0}".format(service_component_name))
+ else:
+ raise DiscoveryError("Failed to push configuration")
+
+def remove_service_component_config(kv_conn, service_component_name):
+ kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete)
+ kv_delete_func(service_component_name)
+
+
+def get_kv_value(kv_conn, key):
+ """Get a key-value entry's value from Consul
+
+ Raises DiscoveryKVEntryNotFoundError if entry not found
+ """
+ kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get)
+ (index, val) = kv_get_func(key)
+
+ if val:
+ return json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate
+ else:
+ raise DiscoveryKVEntryNotFoundError("{0} kv entry not found".format(key))
+
+
+def _create_rel_key(service_component_name):
+ return "{0}:rel".format(service_component_name)
+
+def store_relationship(kv_conn, source_name, target_name):
+ # TODO: Rel entry may already exist in a one-to-many situation. Need to
+ # support that.
+ rel_key = _create_rel_key(source_name)
+ rel_value = [target_name] if target_name else []
+
+ kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put)
+ kv_put_func(rel_key, json.dumps(rel_value))
+ logger.info("Added relationship for {0}".format(rel_key))
+
+def delete_relationship(kv_conn, service_component_name):
+ rel_key = _create_rel_key(service_component_name)
+ kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get)
+ index, rels = kv_get_func(rel_key)
+
+ if rels:
+ rels = json.loads(rels["Value"].decode("utf-8"))
+ kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete)
+ kv_delete_func(rel_key)
+ return rels
+ else:
+ return []
+
+def lookup_service(kv_conn, service_component_name):
+ catalog_get_func = partial(_wrap_consul_call, kv_conn.catalog.service)
+ index, results = catalog_get_func(service_component_name)
+
+ if results:
+ return results
+ else:
+ raise DiscoveryServiceNotFoundError("Failed to find: {0}".format(service_component_name))
+
+
+# TODO: Note these functions have been (for the most part) shamelessly lifted from
+# dcae-cli and should really be shared.
+
+def _is_healthy_pure(get_health_func, instance):
+ """Checks to see if a component instance is running healthy
+
+ Pure function edition
+
+ Args
+ ----
+ get_health_func: func(string) -> complex object
+ Look at unittests in test_discovery to see examples
+ instance: (string) fully qualified name of component instance
+
+ Returns
+ -------
+ True if instance has been found and is healthy else False
+ """
+ index, resp = get_health_func(instance)
+
+ if resp:
+ def is_passing(instance):
+ return all([check["Status"] == "passing" for check in instance["Checks"]])
+
+ return any([is_passing(instance) for instance in resp])
+ else:
+ return False
+
+def is_healthy(consul_host, instance):
+ """Checks to see if a component instance is running healthy
+
+ Impure function edition
+
+ Args
+ ----
+ consul_host: (string) host string of Consul
+ instance: (string) fully qualified name of component instance
+
+ Returns
+ -------
+ True if instance has been found and is healthy else False
+ """
+ cons = create_kv_conn(consul_host)
+
+ get_health_func = partial(_wrap_consul_call, cons.health.service)
+ return _is_healthy_pure(get_health_func, instance)
+
+
+def add_to_entry(conn, key, add_name, add_value):
+ """
+ Find 'key' in consul.
+ Treat its value as a JSON string representing a dict.
+ Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
+ Turn the resulting extended dict into a JSON string.
+ Store the string back into Consul under 'key'.
+ Watch out for conflicting concurrent updates.
+
+ Example:
+ Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
+ add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
+ should result in the value for key 'xyz:dmaap' in consul being updated to
+ '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
+ """
+ while True: # do until update succeeds
+ (index, val) = conn.kv.get(key) # index gives version of key retrieved
+
+ if val is None: # no key yet
+ vstring = '{}'
+ mod_index = 0 # Use 0 as the cas index for initial insertion of the key
+ else:
+ vstring = val['Value']
+ mod_index = val['ModifyIndex']
+
+ # Build the updated dict
+ # Exceptions just propagate
+ v = json.loads(vstring)
+ v[add_name] = add_value
+ new_vstring = json.dumps(v)
+
+ updated = conn.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false
+ if updated:
+ return v
+
+
+def _find_matching_services(services, name_search, tags):
+ """Find matching services given search criteria"""
+ def is_match(service):
+ srv_name, srv_tags = service
+ return name_search in srv_name and \
+ all(map(lambda tag: tag in srv_tags, tags))
+
+ return [ srv[0] for srv in services.items() if is_match(srv) ]
+
+def search_services(conn, name_search, tags):
+ """Search for services that match criteria
+
+ Args:
+ -----
+ name_search: (string) Name to search for as a substring
+ tags: (list) List of strings that are tags. A service must match **all** the
+ tags in the list.
+
+ Retruns:
+ --------
+ List of names of services that matched
+ """
+ # srvs is dict where key is service name and value is list of tags
+ catalog_get_services_func = partial(_wrap_consul_call, conn.catalog.services)
+ index, srvs = catalog_get_services_func()
+
+ if srvs:
+ matches = _find_matching_services(srvs, name_search, tags)
+
+ if matches:
+ return matches
+
+ raise DiscoveryServiceNotFoundError(
+ "No matches found: {0}, {1}".format(name_search, tags))
+ else:
+ raise DiscoveryServiceNotFoundError("No services found")