From 15dd0a3541db1e7ac9f38680d9dbe83cf3d303ae Mon Sep 17 00:00:00 2001 From: vv770d Date: Wed, 17 Mar 2021 22:44:49 +0000 Subject: [DCAE] Remove OTI component OTI has been deprecated Change-Id: Ic2051f9262744081880d8471c31a2491d2e4451c Signed-off-by: vv770d Issue-ID: DCAEGEN2-2676 Signed-off-by: vv770d --- oti/event-handler/otihandler/cfy_client.py | 638 ----------------------------- 1 file changed, 638 deletions(-) delete mode 100644 oti/event-handler/otihandler/cfy_client.py (limited to 'oti/event-handler/otihandler/cfy_client.py') diff --git a/oti/event-handler/otihandler/cfy_client.py b/oti/event-handler/otihandler/cfy_client.py deleted file mode 100644 index 4e3de87..0000000 --- a/oti/event-handler/otihandler/cfy_client.py +++ /dev/null @@ -1,638 +0,0 @@ -# ================================================================================ -# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= - -"""Our client interface to Cloudify""" -import base64 -import copy -import json -import logging -import os - -import requests - -from otihandler.consul_client import ConsulClient - - -class CfyClientConsulError(RuntimeError): - pass - - -class CloudifyClient(object): - """quick replacement for cloudify_rest_client -- this supports pagination and scans all DCAE tenants""" - - def __init__(self, **kwargs): - self._protocol = kwargs.get('protocol', 'http') - self._host = kwargs.get('host') - self._port = kwargs.get('port') - self._headers = kwargs.get('headers') - - self.node_instances = self - - def list(self, **kwargs): - url_mask = "{}://{}:{}/api/v3.1/tenants".format(self._protocol, self._host, self._port) - # s = Session() - # req = Request('GET', url_mask, headers=self._headers) - # prepped = req.prepare() - # response = s.send(prepped,verify=False,timeout=30) - response = requests.get(url_mask, headers=self._headers, timeout=30) - obj = response.json() - tenants = [x["name"] for x in obj["items"]] - tenants_with_containers = [x for x in tenants if 'DCAE' in x] - - size = 1000 - url_mask = "{}://{}:{}/api/v3.1/node-instances?_size={}&_offset={}".format( - self._protocol, self._host, self._port, size, "{}") - if kwargs: - for (key,val) in kwargs.items(): - if isinstance(val, str): - url_mask = url_mask + '&{}={}'.format(key, val) - elif isinstance(val, list): - url_mask = url_mask + '&{}={}'.format(key, ','.join(val)) - - for tenant in tenants_with_containers: - self._headers_with_tenant = copy.deepcopy(self._headers) - self._headers_with_tenant['Tenant'] = tenant - - offset = 0 - total = 1 - while offset < total: - # s = Session() - # req = Request('GET', url_mask.format(offset), headers=self._headers_with_tenant) - # prepped = req.prepare() - # response = s.send(prepped, verify=False, timeout=30) - response = requests.get(url_mask.format(offset), headers=self._headers_with_tenant, timeout=30) - response.raise_for_status() - obj = response.json() - offset = offset + len(obj["items"]) - total = obj["metadata"]["pagination"]["total"] - for item in obj["items"]: - yield NodeInstance(item) - - def update_node_instance(self, node_instance_id, body, **kwargs): - headers = copy.deepcopy(self._headers_with_tenant) - headers['Content-Type'] = "application/json" - url_mask = "{}://{}:{}/api/v3.1/node-instances/{}".format( - self._protocol, self._host, self._port, node_instance_id) - response = requests.patch(url_mask, json=body, headers=headers, timeout=30) - obj = response.json() - return obj - - -class NodeInstance(object): - """quick replacement for cloudify_rest_client""" - - def __init__(self, instance): - self.id = instance.get("id") - self.deployment_id = instance.get("deployment_id") - self.host_id = instance.get("host_id") - self.runtime_properties = instance.get("runtime_properties") - self.relationships = instance.get("relationships") - self.state = instance.get("state") - self.version = instance.get("version") - self.node_id = instance.get("node_id") - self.scaling_groups = instance.get("scaling_groups") - - -class CfyClient(object): - _logger = logging.getLogger("oti_handler.cfy_client") - _client = None - - - @staticmethod - def __set_cloudify_manager_client(): - """Create connection to Cloudify_Manager.""" - - if CfyClient._client: - return - - host = None - port = None - obj = json.loads(os.environ.get("CLOUDIFY", "{}")).get("cloudify") - source = "CLOUDIFY environment variable" - if not obj: - CM_KEY = 'cloudify_manager' - source = "Consul key '{}'".format(CM_KEY) - - try: - results = ConsulClient.lookup_service(CM_KEY) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.lookup_service({})".format(type(e).__name__, e, CM_KEY) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - result = results[0] - host = result['ServiceAddress'] - port = result['ServicePort'] - - try: - obj = ConsulClient.get_value(CM_KEY) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.get_value({})".format(type(e).__name__, e, CM_KEY) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - if not obj: - raise CfyClientConsulError("{} value is empty or invalid".format(source)) - - obj = obj.get('cloudify') - - if not obj: - raise CfyClientConsulError("{} value is missing 'cloudify' key or value".format(source)) - - host = obj.get('address', host) - if not host: - raise CfyClientConsulError("{} value is missing 'cloudify.address'".format(source)) - - port = obj.get('port', port) - if not port: - raise CfyClientConsulError("{} value is missing 'cloudify.port'".format(source)) - - protocol = obj.get('protocol') - if not protocol: - raise CfyClientConsulError("{} value is missing 'cloudify.protocol'".format(source)) - username = obj.get('user') - if not username: - raise CfyClientConsulError("{} value is missing 'cloudify.user'".format(source)) - password = obj.get('password') - if not password: - raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source)) - - b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), "utf-8")).decode("utf-8") - headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')} - #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')} - - CfyClient._client = CloudifyClient(host=host, port=port, protocol=protocol, headers=headers) - - - @staticmethod - def query_k8_components(in_cluster_fqdn): - """ - Iterate components that belong to a cluster fqdn. - - Parameters - ---------- - in_cluster_fqdn : string - k8s cluster FQDN - - Returns - ------- - A generator of tuples of component information - [ (proxy_fqdn, namespace, scn, replicas, scn_port), ... ] - """ - - cnt_found = 0 - CfyClient.__set_cloudify_manager_client() - for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): - rtp = node_instance.runtime_properties - scn_port = None - cluster_fqdn = None - proxy_fqdn = None - dti_info = rtp.get('dti_info') - if dti_info: - env_items = dti_info.get('env') - for env in env_items: - if env.get("name") == 'KUBE_CLUSTER_FQDN': - cluster_fqdn = env.get("value") - if env.get("name") == 'KUBE_PROXY_FQDN': - proxy_fqdn = env.get("value") - ports = dti_info.get('ports') - if ports: - scn_port = ports[0].split(':')[0] - else: - continue - - if in_cluster_fqdn != cluster_fqdn: - continue - - controller_type = rtp.get('k8s_controller_type') - if not controller_type: - CfyClient._logger.debug("controller type is missing") - continue - elif controller_type != "statefulset": - CfyClient._logger.debug("not a stateful set") - continue - - container_id = rtp.get('k8s_deployment') - if not container_id: - CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format( - node_instance.deployment_id, node_instance.id)) - continue - - try: - namespace = container_id.get('namespace') - except: - namespace = '' - pass - - replicas = 1 - try: - replicas = rtp.get('replicas') - except: - pass - - scn = rtp.get('service_component_name') - if not scn: - CfyClient._logger.debug( - "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, - node_instance.id)) - continue - - cnt_found += 1 - yield (proxy_fqdn, namespace, scn, replicas, scn_port) - continue - - msg = "Found {} components (collectors) for cluster={}" \ - .format(cnt_found, in_cluster_fqdn) - CfyClient._logger.debug(msg) - - - @staticmethod - def iter_components(dcae_target_type, dcae_service_location='', component_type=''): - """ - Iterate components that handle a given dcae_target_type. - - Parameters - ---------- - dcae_target_type : string - VNF Type - dcae_service_location : string - Location of the component (optional) - component_type : string - Type of the component (optional) - - Returns - ------- - A generator of tuples of component information - [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] - or - [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ] - - """ - - cnt_found = 0 - - # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI) - dockerhosts = [] - k8s_svcs_tagged_with_clli = [] - if dcae_service_location: - try: - dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location]) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "-component-dockerhost-", [dcae_service_location]) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - try: - k8s_svcs_tagged_with_clli = ConsulClient.search_services("_component_kubernetes_master", [dcae_service_location]) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "_component_kubernetes_master", [dcae_service_location]) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - - CfyClient.__set_cloudify_manager_client() - for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): - rtp = node_instance.runtime_properties - - # Skip this node_instance if it is not a collector - container_type = "docker" - container_id = rtp.get('container_id') - docker_host = '' - svc_with_my_clli_tags = '' - if not container_id: - container_type = "k8s" - container_id = rtp.get('k8s_deployment') - if not container_id: - CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id)) - continue - docker_config = rtp.get('docker_config') - if not docker_config: - CfyClient._logger.debug("{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, node_instance.id)) - continue - dti_reconfig_script = "" - if container_type == "docker": - dti_reconfig_script = rtp.get('dti_reconfig_script') - if not dti_reconfig_script: - CfyClient._logger.debug("{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, node_instance.id)) - continue - elif container_type == "k8s": - dti_reconfig_script = docker_config.get('reconfigs',{}).get('dti') - if not dti_reconfig_script: - CfyClient._logger.debug("{} {} runtime_properties docker_config has no reconfigs.dti".format(node_instance.deployment_id, node_instance.id)) - continue - - scn = rtp.get('service_component_name') - scn_address = None - scn_port = None - if not scn: - CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id)) - continue - if container_type == "docker": - docker_host = rtp.get('selected_container_destination') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id)) - continue - elif container_type == "k8s": - try: - srvcCatalogItem = ConsulClient.lookup_service(scn)[0] - scn_address = srvcCatalogItem.get("ServiceAddress") - except: - CfyClient._logger.debug( - "{} {} runtime_properties has no consul svc catalog registry".format(node_instance.deployment_id, - node_instance.id)) - continue - svc_with_my_clli_tags = rtp.get('svc_with_my_clli_tags') - # e.g., scn="s908d92e232ed43..." - if not svc_with_my_clli_tags: - # We should not incur this burden. k8splugin should store this into runtime properties. - try: - node_name = srvcCatalogItem.get("Node") - if node_name: - # e.g., node_name="zldcdyh1adce3kpma00" - services = ConsulClient.lookup_node(node_name).get("Services") - if services: - for node_svc in list(services.keys()): - if "_component_kubernetes_master" in node_svc: - # e.g., node_svc="zldcdyh1adce3_kp_component_kubernetes_master" - svc_with_my_clli_tags = node_svc - break - except: - pass - # ... cache results we find into runtime properties to avoid searching again - if svc_with_my_clli_tags: - CfyClient._logger.debug("{} {} storing runtime property svc_with_my_clli_tags={}".format( - node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags)) - rtp['svc_with_my_clli_tags'] = svc_with_my_clli_tags - body = { - "runtime_properties": rtp, - "state": node_instance.state, - "version": 1 + int(node_instance.version) - } - try: - CfyClient._client.update_node_instance(node_instance.id, body) - except: - pass - - if not svc_with_my_clli_tags: - CfyClient._logger.debug("{} {} runtime_properties has no svc_with_my_clli_tags".format(node_instance.deployment_id, node_instance.id)) - continue - - # get the nodeport for statefulset sidecar service - dti_info = rtp.get('dti_info') - if dti_info: - ports = dti_info.get('ports') - if ports: - scn_port = ports[0].split(':')[1] - docker_host = rtp.get('configuration',{}).get('file_content') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id)) - continue - - # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG - if dcae_service_location: - if container_type == "docker" and docker_host not in dockerhosts: - CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}" - .format(node_instance.deployment_id, node_instance.id, docker_host, dcae_service_location)) - continue - elif container_type == "k8s" and svc_with_my_clli_tags not in k8s_svcs_tagged_with_clli: - CfyClient._logger.debug("{} {} svc_with_my_clli_tags {} is not TAGged with DTI Event dcae_service_location {}" - .format(node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags, dcae_service_location)) - continue - - # If DTI Event specifies component_type, then collector's service_component_type must match - if component_type: - c_component_type = rtp.get('service_component_type') - if component_type != c_component_type: - CfyClient._logger.debug("{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id)) - continue - - # Check if the collector supports this VNF Type - # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) - dti_key = scn + ':oti' - try: - obj = ConsulClient.get_value(dti_key) - except Exception as e: - CfyClient._logger.error( - "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}" - .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id) - ) - continue - if not obj: - CfyClient._logger.debug("{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, dti_key)) - continue - obj_types = set(k.lower() for k in obj) - if dcae_target_type.lower() in obj_types: - CfyClient._logger.debug("{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, dcae_target_type)) - cnt_found += 1 - yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, dti_reconfig_script, container_type, scn_address, scn_port ) - continue - else: - CfyClient._logger.debug("{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, dcae_target_type, dti_key)) - continue - - msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}"\ - .format(cnt_found, dcae_target_type, dcae_service_location, component_type) - CfyClient._logger.debug(msg) - - @staticmethod - def iter_components_for_docker(dcae_target_type, dcae_service_location='', component_type=''): - """ - Iterate components that handle a given dcae_target_type to find the components of docker type - - Parameters - ---------- - dcae_target_type : string - VNF Type - dcae_service_location : string - Location of the component (optional) - component_type : string - Type of the component (optional) - - Returns - ------- - A generator of tuples of component information - [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] - - """ - - cnt_found = 0 - # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI) - dockerhosts = [] - - if dcae_service_location: - try: - dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location]) - except Exception as e: - msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format( - type(e).__name__, e, "-component-dockerhost-", [dcae_service_location]) - CfyClient._logger.error(msg) - raise CfyClientConsulError(msg) - - CfyClient.__set_cloudify_manager_client() - for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"): - rtp = node_instance.runtime_properties - - # Skip this node_instance if it is not a collector - container_type = "docker" - container_id = rtp.get('container_id') - if not container_id: - if not container_id: - CfyClient._logger.debug("{} {} runtime_properties has no container_id".format( - node_instance.deployment_id, node_instance.id)) - continue - docker_config = rtp.get('docker_config') - if not docker_config: - CfyClient._logger.debug( - "{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, - node_instance.id)) - continue - dti_reconfig_script = "" - dti_reconfig_script = rtp.get('dti_reconfig_script') - if not dti_reconfig_script: - CfyClient._logger.debug( - "{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, - node_instance.id)) - continue - scn = rtp.get('service_component_name') - if not scn: - CfyClient._logger.debug( - "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, - node_instance.id)) - continue - docker_host = rtp.get('selected_container_destination') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format( - node_instance.deployment_id, node_instance.id)) - continue - - # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG - if dcae_service_location: - if docker_host not in dockerhosts: - CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}" - .format(node_instance.deployment_id, node_instance.id, docker_host, - dcae_service_location)) - continue - - # If DTI Event specifies component_type, then collector's service_component_type must match - if component_type: - c_component_type = rtp.get('service_component_type') - if component_type != c_component_type: - CfyClient._logger.debug( - "{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id)) - continue - - # Check if the collector supports this VNF Type - # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config) - dti_key = scn + ':oti' - try: - obj = ConsulClient.get_value(dti_key) - except Exception as e: - CfyClient._logger.error( - "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}" - .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id) - ) - continue - if not obj: - CfyClient._logger.debug( - "{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, - dti_key)) - continue - obj_types = set(k.lower() for k in obj) - if dcae_target_type.lower() in obj_types: - CfyClient._logger.debug( - "{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, - dcae_target_type)) - cnt_found += 1 - yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, - node_instance.state, docker_host, dti_reconfig_script, container_type, '', '') - continue - else: - CfyClient._logger.debug( - "{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, - dcae_target_type, dti_key)) - continue - - msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}" \ - .format(cnt_found, dcae_target_type, dcae_service_location, component_type) - CfyClient._logger.debug(msg) - - - @staticmethod - def iter_components_of_deployment(deployment_id, node_id=None, reconfig_type="app"): - """ - Iterate components of a specific deployment_id. - - Parameters - ---------- - deployment_id : string - Cloudify deployment ID that created the component(s). - node_id : string - Cloudify node ID that created the component. - reconfig_type : string - "app" - - Returns - ------- - A generator of tuples of component information - [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ] - or - [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ] - - """ - - cnt_found = 0 - - CfyClient.__set_cloudify_manager_client() - for node_instance in CfyClient._client.node_instances.list( - deployment_id=deployment_id, - _include=['id','node_id','deployment_id','state','runtime_properties'] - ): - if node_id and node_instance.node_id != node_id: - continue - - rtp = node_instance.runtime_properties - - # Skip this node_instance if it is not a collector - container_type = "docker" - container_id = rtp.get('container_id') - if not container_id: - container_type = "k8s" - container_id = rtp.get('k8s_deployment') - if not container_id: - CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id)) - continue - reconfig_script = rtp.get('docker_config',{}).get('reconfigs',{}).get(reconfig_type) - if not reconfig_script: - CfyClient._logger.debug("{} {} runtime_properties has no docker_config.reconfigs.{}".format(node_instance.deployment_id, node_instance.id, reconfig_type)) - continue - scn = rtp.get('service_component_name') - if not scn: - CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id)) - continue - if container_type == "docker": - docker_host = rtp.get('selected_container_destination') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id)) - continue - elif container_type == "k8s": - docker_host = rtp.get('configuration',{}).get('file_content') - if not docker_host: - CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id)) - continue - - CfyClient._logger.debug("{} {} is a {}-reconfigurable component".format(node_instance.deployment_id, node_instance.id, reconfig_type)) - cnt_found += 1 - yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, reconfig_script, container_type) - continue - - msg = "Found {} {}-reconfigurable components".format(cnt_found, reconfig_type) - CfyClient._logger.debug(msg) -- cgit 1.2.3-korg