summaryrefslogtreecommitdiffstats
path: root/oti/event-handler/otihandler/consul_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'oti/event-handler/otihandler/consul_client.py')
-rw-r--r--oti/event-handler/otihandler/consul_client.py617
1 files changed, 0 insertions, 617 deletions
diff --git a/oti/event-handler/otihandler/consul_client.py b/oti/event-handler/otihandler/consul_client.py
deleted file mode 100644
index 1b25f3e..0000000
--- a/oti/event-handler/otihandler/consul_client.py
+++ /dev/null
@@ -1,617 +0,0 @@
-# ================================================================================
-# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-"""client to talk to consul at consul port 8500"""
-
-import base64
-import copy
-import json
-import logging
-import os
-import re
-import socket
-
-import requests
-
-
-class ConsulClientError(RuntimeError):
- pass
-
-class ConsulClientConnectionError(RuntimeError):
- pass
-
-class ConsulClientServiceNotFoundError(RuntimeError):
- pass
-
-class ConsulClientNodeNotFoundError(RuntimeError):
- pass
-
-class ConsulClientKVEntryNotFoundError(RuntimeError):
- pass
-
-
-class ConsulClient(object):
- """talking to consul"""
-
- CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}"
- CONSUL_KV_MASK = "{}/v1/kv/{}"
- CONSUL_KVS_MASK = "{}/v1/kv/{}?recurse=true"
- CONSUL_TRANSACTION_URL = "{}/v1/txn"
- _logger = logging.getLogger("oti_handler.consul_client")
-
- MAX_OPS_PER_TXN = 64
- # MAX_VALUE_LEN = 512 * 1000
-
- OPERATION_SET = "set"
- OPERATION_DELETE = "delete"
- OPERATION_DELETE_FOLDER = "delete-tree"
-
-
- #----- Methods for Consul services
-
- @staticmethod
- def lookup_service(service_name):
- """find the service record in consul"""
-
- service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
-
- ConsulClient._logger.info("lookup_service(%s)", service_path)
-
- try:
- response = requests.get(service_path, timeout=30)
- response.raise_for_status()
- # except requests.exceptions.HTTPError as e:
- # except requests.exceptions.ConnectionError as e:
- # except requests.exceptions.Timeout as e:
- except requests.exceptions.RequestException as e:
- msg = "lookup_service({}) requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- return_list = response.json()
- # except ValueError as e:
- except Exception as e:
- msg = "lookup_service({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- if not return_list:
- msg = "lookup_service({}) got empty or no value from requests.get({})".format(
- service_name, service_path)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- return return_list
-
-
- @staticmethod
- def get_all_services():
- """List all services from consul"""
-
- service_path = "{}/v1/catalog/services".format(os.environ.get("CONSUL_URL").rstrip("/"))
-
- ConsulClient._logger.info("get_all_services(%s)", service_path)
-
- try:
- response = requests.get(service_path, timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "get_all_services() requests.get({}) threw exception {}: {!s}".format(
- service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- return_dict = response.json()
- except Exception as e:
- msg = "get_all_services() parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- if not return_dict:
- msg = "get_all_services() got empty or no value from requests.get({})".format(
- service_path)
- ConsulClient._logger.info(msg)
- # raise ConsulClientServiceNotFoundError(msg)
-
- return return_dict
-
-
- @staticmethod
- def _find_matching_services(services, name_search, tags):
- """Find matching services given search criteria"""
- sub_tags = tags[0][4:6]
- tags.append(sub_tags)
-
- def is_match(service):
- srv_name, srv_tags = service
- return name_search in srv_name and \
- any([tag in srv_tags for tag in tags])
-
- return [ srv[0] for srv in list(services.items()) if is_match(srv) ]
-
-
- @staticmethod
- def search_services(name_search, tags):
- """
- Search for services that match criteria
-
- Args:
- -----
- name_search: (string) Name to search for as a substring
- tags: (list) List of strings that are tags. A service must match **ANY OF** the
- tags in the list.
-
- Returns:
- --------
- List of names of services that matched
- """
-
- matches = []
-
- # srvs is dict where key is service name and value is list of tags
- srvs = ConsulClient.get_all_services()
-
- if srvs:
- matches = ConsulClient._find_matching_services(srvs, name_search, tags)
-
- return matches
-
-
- @staticmethod
- def get_service_fqdn_port(service_name, node_meta=False):
- """find the service record in consul"""
-
- service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
-
- ConsulClient._logger.info("get_service_fqdn_port(%s)", service_path)
-
- try:
- response = requests.get(service_path, timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "get_service_fqdn_port({}) requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- service = response.json()
- except Exception as e:
- msg = "get_service_fqdn_port({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- if not service:
- msg = "get_service_fqdn_port({}) got empty or no value from requests.get({})".format(
- service_name, service_path)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- try:
- service = service[0] # arbitrarily choose the first one
- port = service["ServicePort"]
-
- # HTTPS certificate validation requires FQDN not IP address
- fqdn = ""
- if node_meta:
- meta = service.get("NodeMeta")
- if meta:
- fqdn = meta.get("fqdn")
- if not fqdn:
- fqdn = socket.getfqdn(str(service["ServiceAddress"]))
- except Exception as e:
- msg = "get_service_fqdn_port({}) parsing result from requests.get({}) threw exception {}: {!s}".format(
- service_name, service_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientServiceNotFoundError(msg)
-
- return (fqdn, port)
-
-
- #----- Methods for Consul nodes
-
- @staticmethod
- def lookup_node(node_name):
- """find the node record in consul"""
-
- node_path = "{}/v1/catalog/node/{}".format(os.environ.get("CONSUL_URL").rstrip("/"), node_name)
-
- ConsulClient._logger.info("lookup_node(%s)", node_path)
-
- try:
- response = requests.get(node_path, timeout=30)
- response.raise_for_status()
- # except requests.exceptions.HTTPError as e:
- # except requests.exceptions.ConnectionError as e:
- # except requests.exceptions.Timeout as e:
- except requests.exceptions.RequestException as e:
- msg = "lookup_node({}) requests.get({}) threw exception {}: {!s}".format(
- node_name, node_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- return_dict = response.json()
- # except ValueError as e:
- except Exception as e:
- msg = "lookup_node({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- node_name, node_path, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientNodeNotFoundError(msg)
-
- if not return_dict:
- msg = "lookup_node({}) got empty or no value from requests.get({})".format(
- node_name, node_path)
- ConsulClient._logger.error(msg)
- raise ConsulClientNodeNotFoundError(msg)
-
- return return_dict
-
-
- #----- Methods for Consul key-values
-
- @staticmethod
- def put_value(key, data, cas=None):
- """put the value for key into consul-kv"""
-
- # ConsulClient._logger.info("put_value(%s)", str(key))
-
- URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
- if cas is not None:
- URL = '{}?cas={}'.format(URL, cas)
-
- try:
- response = requests.put(URL, data=json.dumps(data), timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "put_value({}) requests.put({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- updated = response.json()
- except Exception as e:
- msg = "put_value({}) parsing JSON from requests.put({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- return updated
-
-
- @staticmethod
- def get_value(key, get_index=False):
- """get the value for key from consul-kv"""
-
- URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
-
- try:
- response = requests.get(URL, timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "get_value({}) requests.get({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- data = response.json()
- except Exception as e:
- msg = "get_value({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- if not data:
- msg = "get_value({}) got empty or no value from requests.get({})".format(
- key, URL)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- try:
- value = base64.b64decode(data[0]["Value"]).decode("utf-8")
- value_dict = json.loads(value)
- except Exception as e:
- msg = "get_value({}) decoding value from requests.get({}) threw exception {}: {!s}".format(
- key, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- ConsulClient._logger.info("consul-kv key=%s value(%s) data=%s",
- key, value, json.dumps(data))
-
- if get_index:
- return data[0]["ModifyIndex"], value_dict
-
- return value_dict
-
-
- @staticmethod
- def get_kvs(prefix, nest=True, trim_prefix=False):
- """get key-values for keys beginning with prefix from consul-kv"""
-
- URL = ConsulClient.CONSUL_KVS_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), prefix)
-
- try:
- response = requests.get(URL, timeout=30)
- response.raise_for_status()
- except requests.exceptions.RequestException as e:
- msg = "get_kvs({}) requests.get({}) threw exception {}: {!s}".format(
- prefix, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientConnectionError(msg)
-
- try:
- data = response.json()
- except Exception as e:
- msg = "get_kvs({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
- prefix, URL, type(e).__name__, e)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- if not data:
- msg = "get_kvs({}) got empty or no value from requests.get({})".format(
- prefix, URL)
- ConsulClient._logger.error(msg)
- raise ConsulClientKVEntryNotFoundError(msg)
-
- def put_level_value(level_keys, value, level_dict={}):
- if level_keys:
- key = level_keys.pop(0)
- level_dict[key] = put_level_value(level_keys, value, level_dict.get(key, {}))
- return level_dict
- else:
- return value
-
- rdict = {}
- for item in data:
- v = base64.b64decode(item["Value"]).decode("utf-8")
- try:
- value = json.loads(v)
- except Exception as e:
- value = v
- key = item['Key']
- if trim_prefix:
- key = key[len(prefix):]
- if nest:
- level_keys = key.split('/')
- rdict = put_level_value(level_keys, value, rdict)
- else:
- rdict[key] = value
-
- ConsulClient._logger.info("consul-kv prefix=%s value(%s) data=%s",
- prefix, json.dumps(rdict), json.dumps(data))
- return rdict
-
-
- @staticmethod
- def _gen_txn_operation(verb, key, value=None):
- """returns the properly formatted operation to be used inside transaction"""
-
- # key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key
- if value:
- return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(bytes(value, "utf-8")).decode("utf-8")}}
- return {"KV": {"Verb": verb, "Key": key}}
-
-
- @staticmethod
- def _run_transaction(operation_name, txn):
- """run a single transaction of several operations at consul /txn"""
-
- if not txn:
- return
-
- txn_url = ConsulClient.CONSUL_TRANSACTION_URL.format(os.environ.get("CONSUL_URL").rstrip("/"))
- response = None
- try:
- response = requests.put(txn_url, json=txn, timeout=30)
- except requests.exceptions.RequestException as e:
- ConsulClient._logger.error("failed to {} at {}: exception {}: {!s} on txn={}"
- .format(operation_name, txn_url, type(e).__name__, e, json.dumps(txn)))
- return
-
- if response.status_code != requests.codes.ok:
- ConsulClient._logger.error("failed {} {}: {} text={} txn={} headers={}"
- .format(operation_name, txn_url, response.status_code,
- response.text, json.dumps(txn),
- json.dumps(dict(list(response.request.headers.items())))))
- return
-
- ConsulClient._logger.info("response for {} {}: {} text={} txn={} headers={}"
- .format(operation_name, txn_url, response.status_code,
- response.text, json.dumps(txn),
- json.dumps(dict(list(response.request.headers.items())))))
-
- return True
-
-
- @staticmethod
- def store_kvs(kvs):
- """put kvs into consul-kv"""
-
- if not kvs:
- ConsulClient._logger.warning("kvs not supplied to store_kvs()")
- return
-
- store_kvs = [
- ConsulClient._gen_txn_operation(ConsulClient.OPERATION_SET,
- key, json.dumps(value))
- for key, value in kvs.items()
- ]
- txn = []
- idx_step = ConsulClient.MAX_OPS_PER_TXN - len(txn)
- for idx in range(0, len(store_kvs), idx_step):
- txn += store_kvs[idx : idx + idx_step]
- if not ConsulClient._run_transaction("store_kvs", txn):
- return False
- txn = []
-
- return ConsulClient._run_transaction("store_kvs", txn)
-
-
- @staticmethod
- def delete_key(key):
- """delete key from consul-kv"""
-
- if not key:
- ConsulClient._logger.warning("key not supplied to delete_key()")
- return
-
- delete_key = [
- ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE, key)
- ]
- return ConsulClient._run_transaction("delete_key", delete_key)
-
-
- @staticmethod
- def delete_kvs(key):
- """delete key from consul-kv"""
-
- if not key:
- ConsulClient._logger.warning("key not supplied to delete_kvs()")
- return
-
- delete_kvs = [
- ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE_FOLDER, key)
- ]
- return ConsulClient._run_transaction("delete_kvs", delete_kvs)
-
-
- #----- Methods for Config Binding Service
-
- @staticmethod
- def get_service_component(scn):
- config = json.dumps(ConsulClient.get_value(scn))
-
- try:
- dmaap = ConsulClient.get_value(scn + ":dmaap")
- except Exception as e:
- dmaap = None
- if dmaap:
- for key in list(dmaap.keys()):
- config = re.sub('"<<' + key + '>>"', json.dumps(dmaap[key]), config)
-
- try:
- rel = ConsulClient.get_value(scn + ":rel")
- except Exception as e:
- rel = None
- if rel:
- for key in list(rel.keys()):
- config = re.sub('"{{' + key + '}}"', json.dumps(rel[key]), config)
-
- return json.loads(config)
-
-
- @staticmethod
- def get_service_component_all(scn, policies_as_list=True):
- t_scn = scn + ":"
- t_len = len(t_scn)
- a_dict = ConsulClient.get_kvs(scn)
- b_dict = {}
- for key in a_dict:
- b_key = None
- if key == scn:
- b_dict["config"] = ConsulClient.get_service_component(scn)
- elif key == scn + ":dmaap":
- continue
- elif key[0:t_len] == t_scn:
- b_key = key[t_len:]
- # policies_as_list = True formats policies items in a list like ONAP's CBS; False keeps policy_ids keys
- if policies_as_list and b_key == "policies": # convert items from KVs to a values list
- b_dict[b_key] = {}
- for sub_key in a_dict[key]:
- if sub_key == "items":
- b_dict[b_key][sub_key] = []
- d_dict = a_dict[key][sub_key]
- for item in sorted(d_dict.keys()): # old CBS sorted them so we emulate
- b_dict[b_key][sub_key].append(d_dict[item])
- else:
- b_dict[b_key][sub_key] = copy.deepcopy(a_dict[key][sub_key])
- else:
- b_dict[b_key] = copy.deepcopy(a_dict[key])
- return b_dict
-
-
- @staticmethod
- def add_vnf_id(scn, vnf_type, vnf_id, dti_dict):
- """
- Add VNF instance to Consul scn:oti key.
-
- Treat its value as a JSON string representing a dict.
- Extend the dict by adding a dti_dict for vnf_id under vnf_type.
- Turn the resulting extended dict into a JSON string.
- Store the string back into Consul under scn:oti key.
- Watch out for conflicting concurrent updates.
- """
-
- key = scn + ':oti'
- lc_vnf_type = vnf_type.lower()
- while True: # do until update succeeds
- (mod_index, v) = ConsulClient.get_value(key, get_index=True)
- lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case
- # but DCAE-C doesn't create such keys
-
- if lc_vnf_type not in lc_v:
- return # That VNF type is not supported by this component
- lc_v[lc_vnf_type][vnf_id] = dti_dict # add or replace the VNF instance
-
- updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
- if updated:
- return lc_v
-
-
- @staticmethod
- def delete_vnf_id(scn, vnf_type, vnf_id):
- """
- Delete VNF instance from Consul scn:oti key.
-
- Treat its value as a JSON string representing a dict.
- Modify the dict by deleting the vnf_id key entry from under vnf_type.
- Turn the resulting extended dict into a JSON string.
- Store the string back into Consul under scn:oti key.
- Watch out for conflicting concurrent updates.
- """
-
- key = scn + ':oti'
- lc_vnf_type = vnf_type.lower()
- while True: # do until update succeeds
- (mod_index, v) = ConsulClient.get_value(key, get_index=True)
- lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case
- # but DCAE-C doesn't create such keys
-
- if lc_vnf_type not in lc_v:
- return # That VNF type is not supported by this component
- if vnf_id not in lc_v[lc_vnf_type]:
- return lc_v
- del lc_v[lc_vnf_type][vnf_id] # delete the VNF instance
-
- updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
- if updated:
- return lc_v
-
-
-if __name__ == "__main__":
- value = None
-
- if value:
- print(json.dumps(value, sort_keys=True, indent=4, separators=(',', ': ')))