summaryrefslogtreecommitdiffstats
path: root/oti/event-handler/otihandler/cfy_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'oti/event-handler/otihandler/cfy_client.py')
-rw-r--r--oti/event-handler/otihandler/cfy_client.py638
1 files changed, 638 insertions, 0 deletions
diff --git a/oti/event-handler/otihandler/cfy_client.py b/oti/event-handler/otihandler/cfy_client.py
new file mode 100644
index 0000000..c823340
--- /dev/null
+++ b/oti/event-handler/otihandler/cfy_client.py
@@ -0,0 +1,638 @@
+# ================================================================================
+# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+"""Our client interface to Cloudify"""
+import base64
+import copy
+import json
+import logging
+import os
+
+import requests
+
+from otihandler.consul_client import ConsulClient
+
+
+class CfyClientConsulError(RuntimeError):
+ pass
+
+
+class CloudifyClient(object):
+ """quick replacement for cloudify_rest_client -- this supports pagination and scans all DCAE tenants"""
+
+ def __init__(self, **kwargs):
+ self._protocol = kwargs.get('protocol', 'http')
+ self._host = kwargs.get('host')
+ self._port = kwargs.get('port')
+ self._headers = kwargs.get('headers')
+
+ self.node_instances = self
+
+ def list(self, **kwargs):
+ url_mask = "{}://{}:{}/api/v3.1/tenants".format(self._protocol, self._host, self._port)
+ # s = Session()
+ # req = Request('GET', url_mask, headers=self._headers)
+ # prepped = req.prepare()
+ # response = s.send(prepped,verify=False,timeout=30)
+ response = requests.get(url_mask, headers=self._headers, timeout=30)
+ obj = response.json()
+ tenants = [x["name"] for x in obj["items"]]
+ tenants_with_containers = [x for x in tenants if 'DCAE' in x]
+
+ size = 1000
+ url_mask = "{}://{}:{}/api/v3.1/node-instances?_size={}&_offset={}".format(
+ self._protocol, self._host, self._port, size, "{}")
+ if kwargs:
+ for (key,val) in kwargs.items():
+ if isinstance(val, str):
+ url_mask = url_mask + '&{}={}'.format(key, val)
+ elif isinstance(val, list):
+ url_mask = url_mask + '&{}={}'.format(key, ','.join(val))
+
+ for tenant in tenants_with_containers:
+ self._headers_with_tenant = copy.deepcopy(self._headers)
+ self._headers_with_tenant['Tenant'] = tenant
+
+ offset = 0
+ total = 1
+ while offset < total:
+ # s = Session()
+ # req = Request('GET', url_mask.format(offset), headers=self._headers_with_tenant)
+ # prepped = req.prepare()
+ # response = s.send(prepped, verify=False, timeout=30)
+ response = requests.get(url_mask.format(offset), headers=self._headers_with_tenant, timeout=30)
+ response.raise_for_status()
+ obj = response.json()
+ offset = offset + len(obj["items"])
+ total = obj["metadata"]["pagination"]["total"]
+ for item in obj["items"]:
+ yield NodeInstance(item)
+
+ def update_node_instance(self, node_instance_id, body, **kwargs):
+ headers = copy.deepcopy(self._headers_with_tenant)
+ headers['Content-Type'] = "application/json"
+ url_mask = "{}://{}:{}/api/v3.1/node-instances/{}".format(
+ self._protocol, self._host, self._port, node_instance_id)
+ response = requests.patch(url_mask, json=body, headers=headers, timeout=30)
+ obj = response.json()
+ return obj
+
+
+class NodeInstance(object):
+ """quick replacement for cloudify_rest_client"""
+
+ def __init__(self, instance):
+ self.id = instance.get("id")
+ self.deployment_id = instance.get("deployment_id")
+ self.host_id = instance.get("host_id")
+ self.runtime_properties = instance.get("runtime_properties")
+ self.relationships = instance.get("relationships")
+ self.state = instance.get("state")
+ self.version = instance.get("version")
+ self.node_id = instance.get("node_id")
+ self.scaling_groups = instance.get("scaling_groups")
+
+
+class CfyClient(object):
+ _logger = logging.getLogger("oti_handler.cfy_client")
+ _client = None
+
+
+ @staticmethod
+ def __set_cloudify_manager_client():
+ """Create connection to Cloudify_Manager."""
+
+ if CfyClient._client:
+ return
+
+ host = None
+ port = None
+ obj = json.loads(os.environ.get("CLOUDIFY", "{}")).get("cloudify")
+ source = "CLOUDIFY environment variable"
+ if not obj:
+ CM_KEY = 'cloudify_manager'
+ source = "Consul key '{}'".format(CM_KEY)
+
+ try:
+ results = ConsulClient.lookup_service(CM_KEY)
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.lookup_service({})".format(type(e).__name__, e, CM_KEY)
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+ result = results[0]
+ host = result['ServiceAddress']
+ port = result['ServicePort']
+
+ try:
+ obj = ConsulClient.get_value(CM_KEY)
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.get_value({})".format(type(e).__name__, e, CM_KEY)
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+ if not obj:
+ raise CfyClientConsulError("{} value is empty or invalid".format(source))
+
+ obj = obj.get('cloudify')
+
+ if not obj:
+ raise CfyClientConsulError("{} value is missing 'cloudify' key or value".format(source))
+
+ host = obj.get('address', host)
+ if not host:
+ raise CfyClientConsulError("{} value is missing 'cloudify.address'".format(source))
+
+ port = obj.get('port', port)
+ if not port:
+ raise CfyClientConsulError("{} value is missing 'cloudify.port'".format(source))
+
+ protocol = obj.get('protocol')
+ if not protocol:
+ raise CfyClientConsulError("{} value is missing 'cloudify.protocol'".format(source))
+ username = obj.get('user')
+ if not username:
+ raise CfyClientConsulError("{} value is missing 'cloudify.user'".format(source))
+ password = obj.get('password')
+ if not password:
+ raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source))
+
+ b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), 'utf-8')).decode("utf-8")
+ headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')}
+ #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')}
+
+ CfyClient._client = CloudifyClient(host=host, port=port, protocol=protocol, headers=headers)
+
+
+ @staticmethod
+ def query_k8_components(in_cluster_fqdn):
+ """
+ Iterate components that belong to a cluster fqdn.
+
+ Parameters
+ ----------
+ in_cluster_fqdn : string
+ k8s cluster FQDN
+
+ Returns
+ -------
+ A generator of tuples of component information
+ [ (proxy_fqdn, namespace, scn, replicas, scn_port), ... ]
+ """
+
+ cnt_found = 0
+ CfyClient.__set_cloudify_manager_client()
+ for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
+ rtp = node_instance.runtime_properties
+ scn_port = None
+ cluster_fqdn = None
+ proxy_fqdn = None
+ dti_info = rtp.get('dti_info')
+ if dti_info:
+ env_items = dti_info.get('env')
+ for env in env_items:
+ if env.get("name") == 'KUBE_CLUSTER_FQDN':
+ cluster_fqdn = env.get("value")
+ if env.get("name") == 'KUBE_PROXY_FQDN':
+ proxy_fqdn = env.get("value")
+ ports = dti_info.get('ports')
+ if ports:
+ scn_port = ports[0].split(':')[0]
+ else:
+ continue
+
+ if in_cluster_fqdn != cluster_fqdn:
+ continue
+
+ controller_type = rtp.get('k8s_controller_type')
+ if not controller_type:
+ CfyClient._logger.debug("controller type is missing")
+ continue
+ elif controller_type != "statefulset":
+ CfyClient._logger.debug("not a stateful set")
+ continue
+
+ container_id = rtp.get('k8s_deployment')
+ if not container_id:
+ CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(
+ node_instance.deployment_id, node_instance.id))
+ continue
+
+ try:
+ namespace = container_id.get('namespace')
+ except:
+ namespace = ''
+ pass
+
+ replicas = 1
+ try:
+ replicas = rtp.get('replicas')
+ except:
+ pass
+
+ scn = rtp.get('service_component_name')
+ if not scn:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+
+ cnt_found += 1
+ yield (proxy_fqdn, namespace, scn, replicas, scn_port)
+ continue
+
+ msg = "Found {} components (collectors) for cluster={}" \
+ .format(cnt_found, in_cluster_fqdn)
+ CfyClient._logger.debug(msg)
+
+
+ @staticmethod
+ def iter_components(dcae_target_type, dcae_service_location='', component_type=''):
+ """
+ Iterate components that handle a given dcae_target_type.
+
+ Parameters
+ ----------
+ dcae_target_type : string
+ VNF Type
+ dcae_service_location : string
+ Location of the component (optional)
+ component_type : string
+ Type of the component (optional)
+
+ Returns
+ -------
+ A generator of tuples of component information
+ [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
+ or
+ [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
+
+ """
+
+ cnt_found = 0
+
+ # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
+ dockerhosts = []
+ k8s_svcs_tagged_with_clli = []
+ if dcae_service_location:
+ try:
+ dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+ try:
+ k8s_svcs_tagged_with_clli = ConsulClient.search_services("_component_kubernetes_master", [dcae_service_location])
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "_component_kubernetes_master", [dcae_service_location])
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+
+ CfyClient.__set_cloudify_manager_client()
+ for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
+ rtp = node_instance.runtime_properties
+
+ # Skip this node_instance if it is not a collector
+ container_type = "docker"
+ container_id = rtp.get('container_id')
+ docker_host = ''
+ svc_with_my_clli_tags = ''
+ if not container_id:
+ container_type = "k8s"
+ container_id = rtp.get('k8s_deployment')
+ if not container_id:
+ CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
+ continue
+ docker_config = rtp.get('docker_config')
+ if not docker_config:
+ CfyClient._logger.debug("{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, node_instance.id))
+ continue
+ dti_reconfig_script = ""
+ if container_type == "docker":
+ dti_reconfig_script = rtp.get('dti_reconfig_script')
+ if not dti_reconfig_script:
+ CfyClient._logger.debug("{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, node_instance.id))
+ continue
+ elif container_type == "k8s":
+ dti_reconfig_script = docker_config.get('reconfigs',{}).get('dti')
+ if not dti_reconfig_script:
+ CfyClient._logger.debug("{} {} runtime_properties docker_config has no reconfigs.dti".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ scn = rtp.get('service_component_name')
+ scn_address = None
+ scn_port = None
+ if not scn:
+ CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
+ continue
+ if container_type == "docker":
+ docker_host = rtp.get('selected_container_destination')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
+ continue
+ elif container_type == "k8s":
+ try:
+ srvcCatalogItem = ConsulClient.lookup_service(scn)[0]
+ scn_address = srvcCatalogItem.get("ServiceAddress")
+ except:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no consul svc catalog registry".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+ svc_with_my_clli_tags = rtp.get('svc_with_my_clli_tags')
+ # e.g., scn="s908d92e232ed43..."
+ if not svc_with_my_clli_tags:
+ # We should not incur this burden. k8splugin should store this into runtime properties.
+ try:
+ node_name = srvcCatalogItem.get("Node")
+ if node_name:
+ # e.g., node_name="zldcdyh1adce3kpma00"
+ services = ConsulClient.lookup_node(node_name).get("Services")
+ if services:
+ for node_svc in list(services.keys()):
+ if "_component_kubernetes_master" in node_svc:
+ # e.g., node_svc="zldcdyh1adce3_kp_component_kubernetes_master"
+ svc_with_my_clli_tags = node_svc
+ break
+ except:
+ pass
+ # ... cache results we find into runtime properties to avoid searching again
+ if svc_with_my_clli_tags:
+ CfyClient._logger.debug("{} {} storing runtime property svc_with_my_clli_tags={}".format(
+ node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags))
+ rtp['svc_with_my_clli_tags'] = svc_with_my_clli_tags
+ body = {
+ "runtime_properties": rtp,
+ "state": node_instance.state,
+ "version": 1 + int(node_instance.version)
+ }
+ try:
+ CfyClient._client.update_node_instance(node_instance.id, body)
+ except:
+ pass
+
+ if not svc_with_my_clli_tags:
+ CfyClient._logger.debug("{} {} runtime_properties has no svc_with_my_clli_tags".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ # get the nodeport for statefulset sidecar service
+ dti_info = rtp.get('dti_info')
+ if dti_info:
+ ports = dti_info.get('ports')
+ if ports:
+ scn_port = ports[0].split(':')[1]
+ docker_host = rtp.get('configuration',{}).get('file_content')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
+ if dcae_service_location:
+ if container_type == "docker" and docker_host not in dockerhosts:
+ CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
+ .format(node_instance.deployment_id, node_instance.id, docker_host, dcae_service_location))
+ continue
+ elif container_type == "k8s" and svc_with_my_clli_tags not in k8s_svcs_tagged_with_clli:
+ CfyClient._logger.debug("{} {} svc_with_my_clli_tags {} is not TAGged with DTI Event dcae_service_location {}"
+ .format(node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags, dcae_service_location))
+ continue
+
+ # If DTI Event specifies component_type, then collector's service_component_type must match
+ if component_type:
+ c_component_type = rtp.get('service_component_type')
+ if component_type != c_component_type:
+ CfyClient._logger.debug("{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ # Check if the collector supports this VNF Type
+ # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
+ dti_key = scn + ':dti'
+ try:
+ obj = ConsulClient.get_value(dti_key)
+ except Exception as e:
+ CfyClient._logger.error(
+ "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
+ .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
+ )
+ continue
+ if not obj:
+ CfyClient._logger.debug("{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, dti_key))
+ continue
+ obj_types = set(k.lower() for k in obj)
+ if dcae_target_type.lower() in obj_types:
+ CfyClient._logger.debug("{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, dcae_target_type))
+ cnt_found += 1
+ yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, dti_reconfig_script, container_type, scn_address, scn_port )
+ continue
+ else:
+ CfyClient._logger.debug("{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, dcae_target_type, dti_key))
+ continue
+
+ msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}"\
+ .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
+ CfyClient._logger.debug(msg)
+
+ @staticmethod
+ def iter_components_for_docker(dcae_target_type, dcae_service_location='', component_type=''):
+ """
+ Iterate components that handle a given dcae_target_type to find the components of docker type
+
+ Parameters
+ ----------
+ dcae_target_type : string
+ VNF Type
+ dcae_service_location : string
+ Location of the component (optional)
+ component_type : string
+ Type of the component (optional)
+
+ Returns
+ -------
+ A generator of tuples of component information
+ [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
+
+ """
+
+ cnt_found = 0
+ # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
+ dockerhosts = []
+
+ if dcae_service_location:
+ try:
+ dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
+ except Exception as e:
+ msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(
+ type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
+ CfyClient._logger.error(msg)
+ raise CfyClientConsulError(msg)
+
+ CfyClient.__set_cloudify_manager_client()
+ for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
+ rtp = node_instance.runtime_properties
+
+ # Skip this node_instance if it is not a collector
+ container_type = "docker"
+ container_id = rtp.get('container_id')
+ if not container_id:
+ if not container_id:
+ CfyClient._logger.debug("{} {} runtime_properties has no container_id".format(
+ node_instance.deployment_id, node_instance.id))
+ continue
+ docker_config = rtp.get('docker_config')
+ if not docker_config:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no docker_config".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+ dti_reconfig_script = ""
+ dti_reconfig_script = rtp.get('dti_reconfig_script')
+ if not dti_reconfig_script:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+ scn = rtp.get('service_component_name')
+ if not scn:
+ CfyClient._logger.debug(
+ "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
+ node_instance.id))
+ continue
+ docker_host = rtp.get('selected_container_destination')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(
+ node_instance.deployment_id, node_instance.id))
+ continue
+
+ # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
+ if dcae_service_location:
+ if docker_host not in dockerhosts:
+ CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
+ .format(node_instance.deployment_id, node_instance.id, docker_host,
+ dcae_service_location))
+ continue
+
+ # If DTI Event specifies component_type, then collector's service_component_type must match
+ if component_type:
+ c_component_type = rtp.get('service_component_type')
+ if component_type != c_component_type:
+ CfyClient._logger.debug(
+ "{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ # Check if the collector supports this VNF Type
+ # scn:dti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
+ dti_key = scn + ':dti'
+ try:
+ obj = ConsulClient.get_value(dti_key)
+ except Exception as e:
+ CfyClient._logger.error(
+ "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
+ .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
+ )
+ continue
+ if not obj:
+ CfyClient._logger.debug(
+ "{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id,
+ dti_key))
+ continue
+ obj_types = set(k.lower() for k in obj)
+ if dcae_target_type.lower() in obj_types:
+ CfyClient._logger.debug(
+ "{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id,
+ dcae_target_type))
+ cnt_found += 1
+ yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id,
+ node_instance.state, docker_host, dti_reconfig_script, container_type, '', '')
+ continue
+ else:
+ CfyClient._logger.debug(
+ "{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id,
+ dcae_target_type, dti_key))
+ continue
+
+ msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}" \
+ .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
+ CfyClient._logger.debug(msg)
+
+
+ @staticmethod
+ def iter_components_of_deployment(deployment_id, node_id=None, reconfig_type="app"):
+ """
+ Iterate components of a specific deployment_id.
+
+ Parameters
+ ----------
+ deployment_id : string
+ Cloudify deployment ID that created the component(s).
+ node_id : string
+ Cloudify node ID that created the component.
+ reconfig_type : string
+ "app"
+
+ Returns
+ -------
+ A generator of tuples of component information
+ [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
+ or
+ [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
+
+ """
+
+ cnt_found = 0
+
+ CfyClient.__set_cloudify_manager_client()
+ for node_instance in CfyClient._client.node_instances.list(
+ deployment_id=deployment_id,
+ _include=['id','node_id','deployment_id','state','runtime_properties']
+ ):
+ if node_id and node_instance.node_id != node_id:
+ continue
+
+ rtp = node_instance.runtime_properties
+
+ # Skip this node_instance if it is not a collector
+ container_type = "docker"
+ container_id = rtp.get('container_id')
+ if not container_id:
+ container_type = "k8s"
+ container_id = rtp.get('k8s_deployment')
+ if not container_id:
+ CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
+ continue
+ reconfig_script = rtp.get('docker_config',{}).get('reconfigs',{}).get(reconfig_type)
+ if not reconfig_script:
+ CfyClient._logger.debug("{} {} runtime_properties has no docker_config.reconfigs.{}".format(node_instance.deployment_id, node_instance.id, reconfig_type))
+ continue
+ scn = rtp.get('service_component_name')
+ if not scn:
+ CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
+ continue
+ if container_type == "docker":
+ docker_host = rtp.get('selected_container_destination')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
+ continue
+ elif container_type == "k8s":
+ docker_host = rtp.get('configuration',{}).get('file_content')
+ if not docker_host:
+ CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
+ continue
+
+ CfyClient._logger.debug("{} {} is a {}-reconfigurable component".format(node_instance.deployment_id, node_instance.id, reconfig_type))
+ cnt_found += 1
+ yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, reconfig_script, container_type)
+ continue
+
+ msg = "Found {} {}-reconfigurable components".format(cnt_found, reconfig_type)
+ CfyClient._logger.debug(msg)