diff options
author | Kotagiri, Ramprasad (rp5662) <rp5662@att.com> | 2019-12-19 17:41:16 -0500 |
---|---|---|
committer | Kotagiri, Ramprasad (rp5662) <rp5662@att.com> | 2020-01-21 16:50:17 -0500 |
commit | 158b75abd6095a3155f5057832ec868bc99ced36 (patch) | |
tree | d374ba4adcfa6db9a036cb2bf018fe529c215eee /oti/event-handler/otihandler/consul_client.py | |
parent | 77900bb3097491cd9fca964c111ea70724e53989 (diff) |
Add OTI event-handler project
OTI event handler application in DCAEGEN2 platform
Change-Id: Ie64f26f851e2045f00043f90279d503c2dc62948
Issue-ID: DCAEGEN2-1910
Signed-off-by: Kotagiri, Ramprasad (rp5662) <rp5662@att.com>
Diffstat (limited to 'oti/event-handler/otihandler/consul_client.py')
-rw-r--r-- | oti/event-handler/otihandler/consul_client.py | 617 |
1 files changed, 617 insertions, 0 deletions
diff --git a/oti/event-handler/otihandler/consul_client.py b/oti/event-handler/otihandler/consul_client.py new file mode 100644 index 0000000..d26d3a1 --- /dev/null +++ b/oti/event-handler/otihandler/consul_client.py @@ -0,0 +1,617 @@ +# ================================================================================ +# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= + +"""client to talk to consul at consul port 8500""" + +import base64 +import copy +import json +import logging +import os +import re +import socket + +import requests + + +class ConsulClientError(RuntimeError): + pass + +class ConsulClientConnectionError(RuntimeError): + pass + +class ConsulClientServiceNotFoundError(RuntimeError): + pass + +class ConsulClientNodeNotFoundError(RuntimeError): + pass + +class ConsulClientKVEntryNotFoundError(RuntimeError): + pass + + +class ConsulClient(object): + """talking to consul""" + + CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}" + CONSUL_KV_MASK = "{}/v1/kv/{}" + CONSUL_KVS_MASK = "{}/v1/kv/{}?recurse=true" + CONSUL_TRANSACTION_URL = "{}/v1/txn" + _logger = logging.getLogger("oti_handler.consul_client") + + MAX_OPS_PER_TXN = 64 + # MAX_VALUE_LEN = 512 * 1000 + + OPERATION_SET = "set" + OPERATION_DELETE = "delete" + OPERATION_DELETE_FOLDER = "delete-tree" + + + #----- Methods for Consul services + + @staticmethod + def lookup_service(service_name): + """find the service record in consul""" + + service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name) + + ConsulClient._logger.info("lookup_service(%s)", service_path) + + try: + response = requests.get(service_path, timeout=30) + response.raise_for_status() + # except requests.exceptions.HTTPError as e: + # except requests.exceptions.ConnectionError as e: + # except requests.exceptions.Timeout as e: + except requests.exceptions.RequestException as e: + msg = "lookup_service({}) requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + return_list = response.json() + # except ValueError as e: + except Exception as e: + msg = "lookup_service({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + if not return_list: + msg = "lookup_service({}) got empty or no value from requests.get({})".format( + service_name, service_path) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + return return_list + + + @staticmethod + def get_all_services(): + """List all services from consul""" + + service_path = "{}/v1/catalog/services".format(os.environ.get("CONSUL_URL").rstrip("/")) + + ConsulClient._logger.info("get_all_services(%s)", service_path) + + try: + response = requests.get(service_path, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_all_services() requests.get({}) threw exception {}: {!s}".format( + service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + return_dict = response.json() + except Exception as e: + msg = "get_all_services() parsing JSON from requests.get({}) threw exception {}: {!s}".format( + service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + if not return_dict: + msg = "get_all_services() got empty or no value from requests.get({})".format( + service_path) + ConsulClient._logger.info(msg) + # raise ConsulClientServiceNotFoundError(msg) + + return return_dict + + + @staticmethod + def _find_matching_services(services, name_search, tags): + """Find matching services given search criteria""" + sub_tags = tags[0][4:6] + tags.append(sub_tags) + + def is_match(service): + srv_name, srv_tags = service + return name_search in srv_name and \ + any([tag in srv_tags for tag in tags]) + + return [ srv[0] for srv in list(services.items()) if is_match(srv) ] + + + @staticmethod + def search_services(name_search, tags): + """ + Search for services that match criteria + + Args: + ----- + name_search: (string) Name to search for as a substring + tags: (list) List of strings that are tags. A service must match **ANY OF** the + tags in the list. + + Returns: + -------- + List of names of services that matched + """ + + matches = [] + + # srvs is dict where key is service name and value is list of tags + srvs = ConsulClient.get_all_services() + + if srvs: + matches = ConsulClient._find_matching_services(srvs, name_search, tags) + + return matches + + + @staticmethod + def get_service_fqdn_port(service_name, node_meta=False): + """find the service record in consul""" + + service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name) + + ConsulClient._logger.info("get_service_fqdn_port(%s)", service_path) + + try: + response = requests.get(service_path, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_service_fqdn_port({}) requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + service = response.json() + except Exception as e: + msg = "get_service_fqdn_port({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + if not service: + msg = "get_service_fqdn_port({}) got empty or no value from requests.get({})".format( + service_name, service_path) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + try: + service = service[0] # arbitrarily choose the first one + port = service["ServicePort"] + + # HTTPS certificate validation requires FQDN not IP address + fqdn = "" + if node_meta: + meta = service.get("NodeMeta") + if meta: + fqdn = meta.get("fqdn") + if not fqdn: + fqdn = socket.getfqdn(str(service["ServiceAddress"])) + except Exception as e: + msg = "get_service_fqdn_port({}) parsing result from requests.get({}) threw exception {}: {!s}".format( + service_name, service_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientServiceNotFoundError(msg) + + return (fqdn, port) + + + #----- Methods for Consul nodes + + @staticmethod + def lookup_node(node_name): + """find the node record in consul""" + + node_path = "{}/v1/catalog/node/{}".format(os.environ.get("CONSUL_URL").rstrip("/"), node_name) + + ConsulClient._logger.info("lookup_node(%s)", node_path) + + try: + response = requests.get(node_path, timeout=30) + response.raise_for_status() + # except requests.exceptions.HTTPError as e: + # except requests.exceptions.ConnectionError as e: + # except requests.exceptions.Timeout as e: + except requests.exceptions.RequestException as e: + msg = "lookup_node({}) requests.get({}) threw exception {}: {!s}".format( + node_name, node_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + return_dict = response.json() + # except ValueError as e: + except Exception as e: + msg = "lookup_node({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + node_name, node_path, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientNodeNotFoundError(msg) + + if not return_dict: + msg = "lookup_node({}) got empty or no value from requests.get({})".format( + node_name, node_path) + ConsulClient._logger.error(msg) + raise ConsulClientNodeNotFoundError(msg) + + return return_dict + + + #----- Methods for Consul key-values + + @staticmethod + def put_value(key, data, cas=None): + """put the value for key into consul-kv""" + + # ConsulClient._logger.info("put_value(%s)", str(key)) + + URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key) + if cas is not None: + URL = '{}?cas={}'.format(URL, cas) + + try: + response = requests.put(URL, data=json.dumps(data), timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "put_value({}) requests.put({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + updated = response.json() + except Exception as e: + msg = "put_value({}) parsing JSON from requests.put({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + return updated + + + @staticmethod + def get_value(key, get_index=False): + """get the value for key from consul-kv""" + + URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key) + + try: + response = requests.get(URL, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_value({}) requests.get({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + data = response.json() + except Exception as e: + msg = "get_value({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + if not data: + msg = "get_value({}) got empty or no value from requests.get({})".format( + key, URL) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + try: + value = base64.b64decode(data[0]["Value"]).decode("utf-8") + value_dict = json.loads(value) + except Exception as e: + msg = "get_value({}) decoding value from requests.get({}) threw exception {}: {!s}".format( + key, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + ConsulClient._logger.info("consul-kv key=%s value(%s) data=%s", + key, value, json.dumps(data)) + + if get_index: + return data[0]["ModifyIndex"], value_dict + + return value_dict + + + @staticmethod + def get_kvs(prefix, nest=True, trim_prefix=False): + """get key-values for keys beginning with prefix from consul-kv""" + + URL = ConsulClient.CONSUL_KVS_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), prefix) + + try: + response = requests.get(URL, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + msg = "get_kvs({}) requests.get({}) threw exception {}: {!s}".format( + prefix, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientConnectionError(msg) + + try: + data = response.json() + except Exception as e: + msg = "get_kvs({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format( + prefix, URL, type(e).__name__, e) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + if not data: + msg = "get_kvs({}) got empty or no value from requests.get({})".format( + prefix, URL) + ConsulClient._logger.error(msg) + raise ConsulClientKVEntryNotFoundError(msg) + + def put_level_value(level_keys, value, level_dict={}): + if level_keys: + key = level_keys.pop(0) + level_dict[key] = put_level_value(level_keys, value, level_dict.get(key, {})) + return level_dict + else: + return value + + rdict = {} + for item in data: + v = base64.b64decode(item["Value"]).decode("utf-8") + try: + value = json.loads(v) + except Exception as e: + value = v + key = item['Key'] + if trim_prefix: + key = key[len(prefix):] + if nest: + level_keys = key.split('/') + rdict = put_level_value(level_keys, value, rdict) + else: + rdict[key] = value + + ConsulClient._logger.info("consul-kv prefix=%s value(%s) data=%s", + prefix, json.dumps(rdict), json.dumps(data)) + return rdict + + + @staticmethod + def _gen_txn_operation(verb, key, value=None): + """returns the properly formatted operation to be used inside transaction""" + + # key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key + if value: + return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(value)}} + return {"KV": {"Verb": verb, "Key": key}} + + + @staticmethod + def _run_transaction(operation_name, txn): + """run a single transaction of several operations at consul /txn""" + + if not txn: + return + + txn_url = ConsulClient.CONSUL_TRANSACTION_URL.format(os.environ.get("CONSUL_URL").rstrip("/")) + response = None + try: + response = requests.put(txn_url, json=txn, timeout=30) + except requests.exceptions.RequestException as e: + ConsulClient._logger.error("failed to {} at {}: exception {}: {!s} on txn={}" + .format(operation_name, txn_url, type(e).__name__, e, json.dumps(txn))) + return + + if response.status_code != requests.codes.ok: + ConsulClient._logger.error("failed {} {}: {} text={} txn={} headers={}" + .format(operation_name, txn_url, response.status_code, + response.text, json.dumps(txn), + json.dumps(dict(list(response.request.headers.items()))))) + return + + ConsulClient._logger.info("response for {} {}: {} text={} txn={} headers={}" + .format(operation_name, txn_url, response.status_code, + response.text, json.dumps(txn), + json.dumps(dict(list(response.request.headers.items()))))) + + return True + + + @staticmethod + def store_kvs(kvs): + """put kvs into consul-kv""" + + if not kvs: + ConsulClient._logger.warn("kvs not supplied to store_kvs()") + return + + store_kvs = [ + ConsulClient._gen_txn_operation(ConsulClient.OPERATION_SET, + key, json.dumps(value)) + for key, value in kvs.items() + ] + txn = [] + idx_step = ConsulClient.MAX_OPS_PER_TXN - len(txn) + for idx in range(0, len(store_kvs), idx_step): + txn += store_kvs[idx : idx + idx_step] + if not ConsulClient._run_transaction("store_kvs", txn): + return False + txn = [] + + return ConsulClient._run_transaction("store_kvs", txn) + + + @staticmethod + def delete_key(key): + """delete key from consul-kv""" + + if not key: + ConsulClient._logger.warn("key not supplied to delete_key()") + return + + delete_key = [ + ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE, key) + ] + return ConsulClient._run_transaction("delete_key", delete_key) + + + @staticmethod + def delete_kvs(key): + """delete key from consul-kv""" + + if not key: + ConsulClient._logger.warn("key not supplied to delete_kvs()") + return + + delete_kvs = [ + ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE_FOLDER, key) + ] + return ConsulClient._run_transaction("delete_kvs", delete_kvs) + + + #----- Methods for Config Binding Service + + @staticmethod + def get_service_component(scn): + config = json.dumps(ConsulClient.get_value(scn)) + + try: + dmaap = ConsulClient.get_value(scn + ":dmaap") + except Exception as e: + dmaap = None + if dmaap: + for key in list(dmaap.keys()): + config = re.sub('"<<' + key + '>>"', json.dumps(dmaap[key]), config) + + try: + rel = ConsulClient.get_value(scn + ":rel") + except Exception as e: + rel = None + if rel: + for key in list(rel.keys()): + config = re.sub('"{{' + key + '}}"', json.dumps(rel[key]), config) + + return json.loads(config) + + + @staticmethod + def get_service_component_all(scn, policies_as_list=True): + t_scn = scn + ":" + t_len = len(t_scn) + a_dict = ConsulClient.get_kvs(scn) + b_dict = {} + for key in a_dict: + b_key = None + if key == scn: + b_dict["config"] = ConsulClient.get_service_component(scn) + elif key == scn + ":dmaap": + continue + elif key[0:t_len] == t_scn: + b_key = key[t_len:] + # policies_as_list = True formats policies items in a list like ONAP's CBS; False keeps policy_ids keys + if policies_as_list and b_key == "policies": # convert items from KVs to a values list + b_dict[b_key] = {} + for sub_key in a_dict[key]: + if sub_key == "items": + b_dict[b_key][sub_key] = [] + d_dict = a_dict[key][sub_key] + for item in sorted(d_dict.keys()): # old CBS sorted them so we emulate + b_dict[b_key][sub_key].append(d_dict[item]) + else: + b_dict[b_key][sub_key] = copy.deepcopy(a_dict[key][sub_key]) + else: + b_dict[b_key] = copy.deepcopy(a_dict[key]) + return b_dict + + + @staticmethod + def add_vnf_id(scn, vnf_type, vnf_id, dti_dict): + """ + Add VNF instance to Consul scn:dti key. + + Treat its value as a JSON string representing a dict. + Extend the dict by adding a dti_dict for vnf_id under vnf_type. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under scn:dti key. + Watch out for conflicting concurrent updates. + """ + + key = scn + ':dti' + lc_vnf_type = vnf_type.lower() + while True: # do until update succeeds + (mod_index, v) = ConsulClient.get_value(key, get_index=True) + lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case + # but DCAE-C doesn't create such keys + + if lc_vnf_type not in lc_v: + return # That VNF type is not supported by this component + lc_v[lc_vnf_type][vnf_id] = dti_dict # add or replace the VNF instance + + updated = ConsulClient.put_value(key, lc_v, cas=mod_index) + if updated: + return lc_v + + + @staticmethod + def delete_vnf_id(scn, vnf_type, vnf_id): + """ + Delete VNF instance from Consul scn:dti key. + + Treat its value as a JSON string representing a dict. + Modify the dict by deleting the vnf_id key entry from under vnf_type. + Turn the resulting extended dict into a JSON string. + Store the string back into Consul under scn:dti key. + Watch out for conflicting concurrent updates. + """ + + key = scn + ':dti' + lc_vnf_type = vnf_type.lower() + while True: # do until update succeeds + (mod_index, v) = ConsulClient.get_value(key, get_index=True) + lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case + # but DCAE-C doesn't create such keys + + if lc_vnf_type not in lc_v: + return # That VNF type is not supported by this component + if vnf_id not in lc_v[lc_vnf_type]: + return lc_v + del lc_v[lc_vnf_type][vnf_id] # delete the VNF instance + + updated = ConsulClient.put_value(key, lc_v, cas=mod_index) + if updated: + return lc_v + + +if __name__ == "__main__": + value = None + + if value: + print(json.dumps(value, sort_keys=True, indent=4, separators=(',', ': '))) |