diff options
author | 2019-03-15 12:19:34 -0400 | |
---|---|---|
committer | 2019-03-15 12:19:34 -0400 | |
commit | f0a9edb94c527c74f45416b9f20c5c90a11bb5de (patch) | |
tree | 3af724220ce338b6450b4a0e1d2469927afddbe3 /engine/src | |
parent | 2f66a722dc1a6d4b37218c554199c046719a6873 (diff) |
Initial upload of F-GPS seed code 6/21
Includes:
Engine resource manager code
Change-Id: I1562676793020164686bde53adea33fc3a1e603e
Issue-ID: OPTFRA-440
Signed-off-by: arthur.martella.1@att.com
Diffstat (limited to 'engine/src')
8 files changed, 3458 insertions, 0 deletions
diff --git a/engine/src/valet/engine/resource_manager/__init__.py b/engine/src/valet/engine/resource_manager/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/resource_manager/compute_manager.py b/engine/src/valet/engine/resource_manager/compute_manager.py new file mode 100644 index 0000000..81a95ee --- /dev/null +++ b/engine/src/valet/engine/resource_manager/compute_manager.py @@ -0,0 +1,201 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +from valet.engine.resource_manager.resources.host import Host + + +class ComputeManager(object): + """Resource Manager to maintain compute host resources.""" + + def __init__(self, _source, _logger): + """Define compute hosts and server allocations.""" + + self.source = _source + + self.hosts = {} + + self.logger = _logger + + def get_hosts(self, _resource): + """Check any inconsistency and perform garbage collection if necessary.""" + + self.logger.info("set compute hosts...") + + # Init first + self.hosts.clear() + + # Get available hosts only + if self.source.get_hosts(self.hosts) != "ok": + self.logger.warning("fail to set hosts from source (e.g., nova)") + return False + + # Get servers + if self.source.get_servers_in_hosts(self.hosts) != "ok": + self.logger.warning("fail to set servers from source (e.g., nova)") + return False + + self._check_host_updated(_resource) + + self._check_server_placements(_resource) + + return True + + def _check_host_updated(self, _resource): + """Check if hosts and their properties are changed.""" + + for hk in self.hosts.keys(): + if hk not in _resource.hosts.keys(): + _resource.hosts[hk] = Host(hk) + _resource.mark_host_updated(hk) + + self.logger.info("new host (" + hk + ") added") + + for rhk in _resource.hosts.keys(): + if rhk not in self.hosts.keys(): + _resource.hosts[rhk].status = "disabled" + _resource.mark_host_updated(rhk) + + self.logger.info("host (" + rhk + ") disabled") + + for hk in self.hosts.keys(): + host = self.hosts[hk] + rhost = _resource.hosts[hk] + + if self._is_host_resources_updated(host, rhost): + _resource.mark_host_updated(hk) + + def _is_host_resources_updated(self, _host, _rhost): + """Check the resource amount consistency.""" + + resource_updated = False + + if _host.original_vCPUs != _rhost.original_vCPUs: + _rhost.original_vCPUs = _host.original_vCPUs + + self.logger.info("host (" + _rhost.name + ") updated (origin CPU updated)") + resource_updated = True + + if _host.vCPUs_used != _rhost.vCPUs_used: + _rhost.vCPUs_used = _host.vCPUs_used + + self.logger.info("host (" + _rhost.name + ") updated (CPU updated)") + resource_updated = True + + if _host.original_mem_cap != _rhost.original_mem_cap: + _rhost.original_mem_cap = _host.original_mem_cap + + self.logger.info("host (" + _rhost.name + ") updated (origin mem updated)") + resource_updated = True + + if _host.free_mem_mb != _rhost.free_mem_mb: + _rhost.free_mem_mb = _host.free_mem_mb + + self.logger.info("host (" + _rhost.name + ") updated (mem updated)") + resource_updated = True + + if _host.original_local_disk_cap != _rhost.original_local_disk_cap: + _rhost.original_local_disk_cap = _host.original_local_disk_cap + + self.logger.info("host (" + _rhost.name + ") updated (origin disk updated)") + resource_updated = True + + if _host.free_disk_gb != _rhost.free_disk_gb: + _rhost.free_disk_gb = _host.free_disk_gb + + self.logger.info("host (" + _rhost.name + ") updated (local disk space updated)") + resource_updated = True + + if _host.disk_available_least != _rhost.disk_available_least: + _rhost.disk_available_least = _host.disk_available_least + + self.logger.info("host (" + _rhost.name + ") updated (least disk space updated)") + resource_updated = True + + return resource_updated + + def _check_server_placements(self, _resource): + """Check the consistency of server placements with nova.""" + + # To keep how server placements changed. + # key = + # If uuid is available, uuid + # Else stack_id:name + # value = {new_host, old_host, server_info} + # The server's state must be either 'created', 'migrated', or 'rebuilt'. + # That is, deal with only the server which placement decision is confirmed. + # If value of new_host (from nova) exists but not for old_host (valet), + # the server is unknown one to valet. + # If value of new_host not exists but exist for old_host, + # the server is deleted by nova. + # If value exists both in new_host and old_host, + # the server is moved from old to new host. + # If value not exist neither, + # the server is placed as planned. + change_of_placements = {} + + for hk, host in self.hosts.iteritems(): + rhost = _resource.hosts[hk] + + for s_info in host.server_list: + if s_info["stack_id"] != "none": + sid = s_info["stack_id"] + ":" + s_info["name"] + else: + sid = s_info["uuid"] + + change_of_placements[sid] = {} + change_of_placements[sid]["info"] = s_info + + if not rhost.has_server(s_info): + change_of_placements[sid]["new_host"] = hk + + self.logger.info("host (" + hk + ") updated (server added)") + else: + change_of_placements[sid]["host"] = hk + + for rhk, rhost in _resource.hosts.iteritems(): + if not rhost.is_available(): + continue + + host = self.hosts[rhk] + + for s_info in rhost.server_list: + # Deal with confirmed placements only. + if s_info["state"] not in ("created", "migrated", "rebuilt"): + continue + + if s_info["stack_id"] != "none": + sid = s_info["stack_id"] + ":" + s_info["name"] + else: + sid = s_info["uuid"] + + if not host.has_server(s_info): + if sid in change_of_placements.keys(): + change_of_placements[sid]["old_host"] = rhk + + self.logger.info("server (" + sid + ") is migrated`") + else: + change_of_placements[sid] = {} + change_of_placements[sid]["old_host"] = rhk + change_of_placements[sid]["info"] = s_info + + self.logger.info("server (" + sid + ") is deleted") + + _resource.change_of_placements = change_of_placements diff --git a/engine/src/valet/engine/resource_manager/metadata_manager.py b/engine/src/valet/engine/resource_manager/metadata_manager.py new file mode 100644 index 0000000..f34e3f0 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/metadata_manager.py @@ -0,0 +1,424 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +import json + +from copy import deepcopy + + +class MetadataManager(object): + """Metadata Manager to maintain flavors and groups.""" + + def __init__(self, _source, _logger): + self.source = _source + + self.groups = {} + + self.flavors = {} + + self.logger = _logger + + def get_groups(self, _resource): + """Set groups (availability-zones, host-aggregates, server groups) + + from platform (e.g., nova). + """ + + self.logger.info("set metadata (groups)...") + + # Init first + self.groups.clear() + + # Get enabled groups only + if self.source.get_groups(self.groups) != "ok": + self.logger.warning("fail to set groups from source (e.g., nova)") + return False + + self._check_group_updated(_resource) + + self._check_host_memberships_updated(_resource) + + return True + + def _check_group_updated(self, _resource): + """Check any inconsistency for groups.""" + + for gk in self.groups.keys(): + if gk not in _resource.groups.keys(): + _resource.groups[gk] = deepcopy(self.groups[gk]) + _resource.groups[gk].updated = True + + self.logger.info("new group (" + gk + ") added") + + for rgk in _resource.groups.keys(): + rg = _resource.groups[rgk] + + if rg.factory != "valet": + if rgk not in self.groups.keys(): + rg.status = "disabled" + rg.updated = True + + self.logger.info("group (" + rgk + ") disabled") + + for gk in self.groups.keys(): + g = self.groups[gk] + rg = _resource.groups[gk] + + if rg.uuid is None and g.uuid is not None: + rg.uuid = g.uuid + rg.updated = True + + self.logger.info("group (" + gk + ") uuid updated") + + # TODO: Clean up resource.hosts if each is not in any AZ members. + + if g.group_type == "aggr": + if not gk.startswith("valet:"): + if self._is_group_metadata_updated(g, rg): + rg.updated = True + + self.logger.info("group (" + gk + ") metadata updated") + + if g.group_type == "az" or g.group_type == "aggr": + if self._is_member_hosts_updated(g, _resource): + rg.updated = True + + self.logger.info("group (" + gk + ") member hosts updated") + + if g.factory == "server-group": + if self._is_new_servers(g, rg): + rg.updated = True + + self.logger.info("group (" + gk + ") server_list updated") + + def _is_group_metadata_updated(self, _g, _rg): + """Check any change in metadata of group.""" + + updated = False + + for mdk in _g.metadata.keys(): + if mdk not in _rg.metadata.keys(): + _rg.metadata[mdk] = _g.metadata[mdk] + updated = True + + for rmdk in _rg.metadata.keys(): + if rmdk not in _g.metadata.keys(): + del _rg.metadata[rmdk] + updated = True + + for mdk in _g.metadata.keys(): + mdv = _g.metadata[mdk] + rmdv = _rg.metadata[mdk] + if mdv != rmdv: + _rg.metadata[mdk] = mdv + updated = True + + return updated + + def _is_member_hosts_updated(self, _g, _resource): + """Check any change in member hosts of group.""" + + updated = False + + _rg = _resource.groups[_g.name] + + for hk in _g.member_hosts.keys(): + if hk not in _rg.member_hosts.keys(): + if hk in _resource.hosts.keys(): + if _resource.hosts[hk].is_available(): + _rg.member_hosts[hk] = deepcopy(_g.member_hosts[hk]) + updated = True + # else not needed + + for rhk in _rg.member_hosts.keys(): + if rhk not in _resource.hosts.keys() or \ + not _resource.hosts[rhk].is_available() or \ + rhk not in _g.member_hosts.keys(): + del _rg.member_hosts[rhk] + updated = True + + return updated + + def _is_new_servers(self, _g, _rg): + """Check if there is any new server.""" + + updated = False + + for s_info in _g.server_list: + exist = False + for rs_info in _rg.server_list: + if rs_info.get("uuid") == s_info.get("uuid"): + exist = True + break + + if not exist: + _rg.server_list.append(s_info) + updated = True + + return updated + + def _check_host_memberships_updated(self, _resource): + """Check host memberships consistency.""" + + for gk, g in _resource.groups.iteritems(): + # Other group types will be handled later + if g.factory != "valet" and g.status == "enabled": + for hk in g.member_hosts.keys(): + host = _resource.hosts[hk] + if gk not in host.memberships.keys() or g.updated: + host.memberships[gk] = g + _resource.mark_host_updated(hk) + + self.logger.info("host (" + hk + ") updated (update membership - " + gk + ")") + + for hk, host in _resource.hosts.iteritems(): + if host.is_available(): + for gk in host.memberships.keys(): + if gk in _resource.groups.keys(): + g = _resource.groups[gk] + if g.factory != "valet": + if g.status == "enabled": + if g.updated: + host.memberships[gk] = g + _resource.mark_host_updated(hk) + + self.logger.info("host (" + hk + ") updated (update membership - " + gk + ")") + else: + del host.memberships[gk] + _resource.mark_host_updated(hk) + + self.logger.info("host (" + hk + ") updated (remove membership - " + gk + ")") + else: + del host.memberships[gk] + _resource.mark_host_updated(hk) + + self.logger.info("host (" + hk + ") updated (remove membership - " + gk + ")") + + def create_exclusive_aggregate(self, _group, _hosts): + """Set Host-Aggregate to apply Exclusivity.""" + + az = _hosts[0].get_availability_zone() + + # To remove 'az:' header from name + az_name_elements = az.name.split(':', 1) + if len(az_name_elements) > 1: + az_name = az_name_elements[1] + else: + az_name = az.name + + status = self.source.set_aggregate(_group.name, az_name) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + _group.name + ") created") + + aggregates = {} + status = self.source.get_aggregates(aggregates) + if status != "ok": + return status + + if _group.name in aggregates.keys(): + _group.uuid = aggregates[_group.name].uuid + + if len(_group.metadata) > 0: + metadata = {} + for mk, mv in _group.metadata.iteritems(): + if mk == "prior_metadata": + metadata[mk] = json.dumps(mv) + else: + metadata[mk] = mv + + status = self.source.set_metadata_of_aggregate(_group.uuid, metadata) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + _group.name + ") metadata created") + + for host in _hosts: + if host.name in _group.metadata.keys(): + aggr_uuids = _group.metadata[host.name].split(',') + + for uuid in aggr_uuids: + status = self.source.remove_host_from_aggregate(int(uuid), host.name) + if status != "ok": + return status + + self.logger.debug("host-aggregate(" + uuid + ") host(" + host.name + ") removed") + + status = self.source.add_host_to_aggregate(_group.uuid, host.name) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + _group.name + ") host(" + host.name + ") added") + else: + status = "dynamic host-aggregate not found" + self.logger.error(status) + return status + + return "ok" + + def update_exclusive_aggregate(self, _id, _metadata, _host, _old_aggregates): + """Update Host-Aggregate to apply Exclusivity.""" + + if len(_metadata) > 0: + metadata = {} + for mk, mv in _metadata.iteritems(): + if mk == "prior_metadata": + metadata[mk] = json.dumps(mv) + else: + metadata[mk] = mv + + status = self.source.set_metadata_of_aggregate(_id, metadata) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") metadata updated") + + for oa in _old_aggregates: + status = self.source.remove_host_from_aggregate(oa.uuid, _host) + if status != "ok": + return status + + self.logger.debug("host-aggregate(" + oa.name + ") host(" + _host + ") removed") + + status = self.source.add_host_to_aggregate(_id, _host) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") host(" + _host + ") added") + + return "ok" + + def remove_host_from_exclusive_aggregate(self, _id, _metadata, _host, _old_aggregates): + """Remove host from Host-Aggregate to apply Exclusivity.""" + + if len(_metadata) > 0: + metadata = {} + for mk, mv in _metadata.iteritems(): + if mk == "prior_metadata": + metadata[mk] = json.dumps(mv) + else: + metadata[mk] = mv + + status = self.source.set_metadata_of_aggregate(_id, metadata) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") metadata updated") + + status = self.source.remove_host_from_aggregate(_id, _host) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") host(" + _host + ") removed") + + for oa in _old_aggregates: + status = self.source.add_host_to_aggregate(oa.uuid, _host) + if status != "ok": + return status + + self.logger.debug("host-aggregate(" + oa.name + ") host(" + _host + ") added") + + return "ok" + + def remove_exclusive_aggregate(self, _id): + """Remove Host-Aggregate.""" + + status = self.source.delete_aggregate(_id) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") removed") + + return "ok" + + def get_flavors(self, _resource): + """Set flavors from nova.""" + + self.logger.info("set metadata (flavors)...") + + # Init first + self.flavors.clear() + + # Get enabled flavors only + if self.source.get_flavors(self.flavors, detailed=False) != "ok": + return False + + self._check_flavor_update(_resource, False) + + return True + + def _check_flavor_update(self, _resource, _detailed): + """Check flavor info consistency.""" + + for fk in self.flavors.keys(): + if fk not in _resource.flavors.keys(): + _resource.flavors[fk] = deepcopy(self.flavors[fk]) + _resource.flavors[fk].updated = True + + self.logger.info("new flavor (" + fk + ":" + self.flavors[fk].flavor_id + ") added") + + for rfk in _resource.flavors.keys(): + rf = _resource.flavors[rfk] + if rfk not in self.flavors.keys(): + rf.status = "disabled" + rf.updated = True + + self.logger.info("flavor (" + rfk + ":" + rf.flavor_id + ") removed") + + if _detailed: + for fk in self.flavors.keys(): + f = self.flavors[fk] + rf = _resource.flavors[fk] + if self._is_flavor_spec_updated(f, rf): + rf.updated = True + + self.logger.info("flavor (" + fk + ":" + rf.flavor_id + ") spec updated") + + def _is_flavor_spec_updated(self, _f, _rf): + """Check flavor's spec consistency.""" + + spec_updated = False + + if _f.vCPUs != _rf.vCPUs or _f.mem_cap != _rf.mem_cap or _f.disk_cap != _rf.disk_cap: + _rf.vCPUs = _f.vCPUs + _rf.mem_cap = _f.mem_cap + _rf.disk_cap = _f.disk_cap + spec_updated = True + + for sk in _f.extra_specs.keys(): + if sk not in _rf.extra_specs.keys(): + _rf.extra_specs[sk] = _f.extra_specs[sk] + spec_updated = True + + for rsk in _rf.extra_specs.keys(): + if rsk not in _f.extra_specs.keys(): + del _rf.extra_specs[rsk] + spec_updated = True + + for sk in _f.extra_specs.keys(): + sv = _f.extra_specs[sk] + rsv = _rf.extra_specs[sk] + if sv != rsv: + _rf.extra_specs[sk] = sv + spec_updated = True + + return spec_updated diff --git a/engine/src/valet/engine/resource_manager/naming.py b/engine/src/valet/engine/resource_manager/naming.py new file mode 100644 index 0000000..bdf5211 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/naming.py @@ -0,0 +1,146 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +import copy +import re + +from sre_parse import isdigit +from valet.engine.resource_manager.resources.host_group import HostGroup + + +class Naming(object): + """Using cannonical naming convention to capture datacenter layout.""" + + def __init__(self, _config, _logger): + self.logger = _logger + + self.rack_code_list = _config.get("rack_codes") + self.host_code_list = _config.get("host_codes") + + def get_topology(self, _datacenter, _host_groups, _hosts, _rhosts): + """Set datacenter resource structure (racks, hosts).""" + + status = "ok" + + for rhk, rhost in _rhosts.iteritems(): + h = copy.deepcopy(rhost) + + (rack_name, parsing_status) = self._set_layout_by_name(rhk) + if parsing_status != "ok": + self.logger.warning(parsing_status + " in host_name (" + rhk + ")") + + if rack_name == "none": + h.host_group = _datacenter + _datacenter.resources[rhk] = h + else: + if rack_name not in _host_groups.keys(): + host_group = HostGroup(rack_name) + host_group.host_type = "rack" + _host_groups[host_group.name] = host_group + else: + host_group = _host_groups[rack_name] + + h.host_group = host_group + host_group.child_resources[rhk] = h + + _hosts[h.name] = h + + for hgk, hg in _host_groups.iteritems(): + hg.parent_resource = _datacenter + _datacenter.resources[hgk] = hg + + if "none" in _host_groups.keys(): + self.logger.warning("some hosts are into unknown rack") + + return status + + def _set_layout_by_name(self, _host_name): + """Set the rack-host layout, use host nameing convention. + + Naming convention includes + zone name is any word followed by at least one of [0-9] + rack name is rack_code followd by at least one of [0-9] + host name is host_code followed by at least one of [0-9] + an example is + 'abcd_001A' (as a zone_name) + + 'r' (as a rack_code) + '01A' + + 'c' (as a host_code) + '001A' + """ + + zone_name = None + rack_name = None + host_name = None + + # To check if zone name follows the rule + index = 0 + for c in _host_name: + if isdigit(c): + break + index += 1 + zone_indicator = _host_name[index:] + if len(zone_indicator) == 0: + return 'none', "no numberical digit in name" + + # To extract rack indicator + for rack_code in self.rack_code_list: + rack_index_list = [rc.start() for rc in re.finditer(rack_code, zone_indicator)] + + start_of_rack_index = -1 + for rack_index in rack_index_list: + rack_prefix = rack_index + len(rack_code) + if rack_prefix > len(zone_indicator): + continue + + # Once rack name follows the rule + if isdigit(zone_indicator[rack_prefix]): + rack_indicator = zone_indicator[rack_prefix:] + + # To extract host indicator + for host_code in self.host_code_list: + host_index_list = [hc.start() for hc in re.finditer(host_code, rack_indicator)] + + start_of_host_index = -1 + for host_index in host_index_list: + host_prefix = host_index + len(host_code) + if host_prefix > len(rack_indicator): + continue + + if isdigit(rack_indicator[host_prefix]): + host_name = rack_indicator[host_index:] + start_of_host_index = rack_index + host_index + 1 + break + + if host_name is not None: + rack_name = zone_indicator[rack_index:start_of_host_index] + break + + if rack_name is not None: + start_of_rack_index = index + rack_index + break + + if rack_name is not None: + zone_name = _host_name[:start_of_rack_index] + break + + if rack_name is None: + return 'none', "no host or rack name found in " + _host_name + else: + return zone_name + rack_name, "ok" diff --git a/engine/src/valet/engine/resource_manager/nova_compute.py b/engine/src/valet/engine/resource_manager/nova_compute.py new file mode 100644 index 0000000..6887eb8 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/nova_compute.py @@ -0,0 +1,544 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import json
+import time
+import traceback
+
+from novaclient import client as nova_client
+
+from valet.engine.resource_manager.resources.flavor import Flavor
+from valet.engine.resource_manager.resources.group import Group
+from valet.engine.resource_manager.resources.host import Host
+from valet.utils.decryption import decrypt
+
+
+# Nova API version
+VERSION = 2
+
+
+# noinspection PyBroadException
+class NovaCompute(object):
+ """Source to collect resource status (i.e., OpenStack Nova).
+
+ Manupulate Host-Aggregate with Valet placement decisions.
+ """
+
+ def __init__(self, _config, _logger):
+ self.logger = _logger
+
+ self.nova = None
+
+ self.novas = {}
+ self.last_activate_urls = {}
+ self.life_time = 43200 # 12 hours
+
+ # TODO(Gueyoung): handle both admin and admin_view accounts.
+
+ pw = decrypt(_config["engine"]["ek"],
+ _config["logging"]["lk"],
+ _config["db"]["dk"],
+ _config["nova"]["admin_view_password"])
+
+ self.admin_username = _config["nova"]["admin_view_username"]
+ self.admin_password = pw
+ self.project = _config["nova"]["project_name"]
+
+ def set_client(self, _auth_url):
+ """Set nova client."""
+
+ try:
+ # TODO: add timeout=_timeout?
+ self.novas[_auth_url] = nova_client.Client(VERSION,
+ self.admin_username,
+ self.admin_password,
+ self.project,
+ _auth_url)
+
+ self.last_activate_urls[_auth_url] = time.time()
+
+ self.nova = self.novas[_auth_url]
+ return True
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return False
+
+ def valid_client(self, _auth_url):
+ """Check if nova connection is valid."""
+
+ if _auth_url not in self.novas.keys():
+ return False
+
+ if _auth_url not in self.last_activate_urls.keys():
+ return False
+
+ elapsed_time = time.time() - self.last_activate_urls[_auth_url]
+
+ if elapsed_time > self.life_time:
+ return False
+
+ self.nova = self.novas[_auth_url]
+
+ return True
+
+ def get_groups(self, _groups):
+ """Get server-groups, availability-zones and host-aggregates
+
+ from OpenStack Nova.
+ """
+
+ status = self._get_availability_zones(_groups)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ status = self.get_aggregates(_groups)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ status = self._get_server_groups(_groups)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ return "ok"
+
+ def _get_availability_zones(self, _groups):
+ """Set AZs."""
+
+ try:
+ # TODO: try hosts_list = self.nova.hosts.list()?
+
+ az_list = self.nova.availability_zones.list(detailed=True)
+
+ for a in az_list:
+ if a.zoneState["available"]:
+ # NOTE(Gueyoung): add 'az:' to avoid conflict with
+ # Host-Aggregate name.
+ az_id = "az:" + a.zoneName
+
+ az = Group(az_id)
+
+ az.group_type = "az"
+ az.factory = "nova"
+ az.level = "host"
+
+ # TODO: Get AZ first with init Compute Hosts?
+
+ for hk, h_info in a.hosts.iteritems():
+ if "nova-compute" in h_info.keys():
+ if h_info["nova-compute"]["active"] and \
+ h_info["nova-compute"]["available"]:
+ az.member_hosts[hk] = []
+
+ _groups[az_id] = az
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting availability-zones from Nova"
+
+ return "ok"
+
+ def get_aggregates(self, _groups):
+ """Set host-aggregates and corresponding hosts."""
+
+ try:
+ aggregate_list = self.nova.aggregates.list()
+
+ for a in aggregate_list:
+ if not a.deleted:
+ aggregate = Group(a.name)
+
+ aggregate.uuid = a.id
+
+ aggregate.group_type = "aggr"
+ aggregate.factory = "nova"
+ aggregate.level = "host"
+
+ metadata = {}
+ for mk in a.metadata.keys():
+ if mk == "prior_metadata":
+ metadata[mk] = json.loads(a.metadata.get(mk))
+ else:
+ metadata[mk] = a.metadata.get(mk)
+ aggregate.metadata = metadata
+
+ for hn in a.hosts:
+ aggregate.member_hosts[hn] = []
+
+ _groups[aggregate.name] = aggregate
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting host-aggregates from Nova"
+
+ return "ok"
+
+ def set_aggregate(self, _name, _az):
+ """Create a Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.create(_name, _az)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting a host-aggregate in Nova"
+
+ return "ok"
+
+ def add_host_to_aggregate(self, _aggr, _host):
+ """Add a Host into the Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.add_host(_aggr, _host)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while adding a host into host-aggregate in Nova"
+
+ return "ok"
+
+ def delete_aggregate(self, _aggr):
+ """Delete the Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.delete(_aggr)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while deleting host-aggregate from Nova"
+
+ return "ok"
+
+ def remove_host_from_aggregate(self, _aggr, _host):
+ """Remove the Host from the Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.remove_host(_aggr, _host)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while removing host from host-aggregate in Nova"
+
+ return "ok"
+
+ def set_metadata_of_aggregate(self, _aggr, _metadata):
+ """Set metadata.
+
+ Note that Nova adds key/value pairs into metadata instead of replacement.
+ """
+
+ try:
+ self.nova.aggregates.set_metadata(_aggr, _metadata)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting metadata of host-aggregate in Nova"
+
+ return "ok"
+
+ def _get_server_groups(self, _groups):
+ """Set host-aggregates and corresponding hosts."""
+
+ try:
+ # NOTE(Gueyoung): novaclient v2.18.0 does not have 'all_projects=True' param.
+ server_group_list = self.nova.server_groups.list()
+
+ for g in server_group_list:
+ server_group = Group(g.name)
+
+ server_group.uuid = g.id
+
+ # TODO: Check len(g.policies) == 1
+ # policy is either 'affinity', 'anti-affinity', 'soft-affinity',
+ # or 'soft-anti-affinity'
+ if g.policies[0] == "anti-affinity":
+ server_group.group_type = "diversity"
+ else:
+ server_group.group_type = g.policies[0]
+ server_group.factory = "server-group"
+ server_group.level = "host"
+
+ # Members attribute is a list of server uuids
+ for s_uuid in g.members:
+ s_info = {}
+ s_info["stack_id"] = "none"
+ s_info["stack_name"] = "none"
+ s_info["uuid"] = s_uuid
+ s_info["orch_id"] = "none"
+ s_info["name"] = "none"
+ s_info["flavor_id"] = "none"
+ s_info["vcpus"] = -1
+ s_info["mem"] = -1
+ s_info["disk"] = -1
+ s_info["numa"] = "none"
+ s_info["image_id"] = "none"
+ s_info["tenant_id"] = "none"
+ s_info["state"] = "created"
+ s_info["status"] = "valid"
+
+ server_group.server_list.append(s_info)
+
+ # TODO: Check duplicated name as group identifier
+ _groups[server_group.name] = server_group
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting server-groups from Nova"
+
+ return "ok"
+
+ def get_hosts(self, _hosts):
+ """Set host resources info."""
+
+ # TODO: Deprecated as of version 2.43
+ status = self._get_hosts(_hosts)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ status = self._get_host_details(_hosts)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ return "ok"
+
+ # TODO: Deprecated as of version 2.43
+ def _get_hosts(self, _hosts):
+ """Init hosts."""
+
+ try:
+ host_list = self.nova.hosts.list()
+
+ for h in host_list:
+ if h.service == "compute":
+ host = Host(h.host_name)
+ _hosts[host.name] = host
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting hosts from Nova"
+
+ return "ok"
+
+ def _get_host_details(self, _hosts):
+ """Get each host's resource status."""
+
+ try:
+ # TODO: marker: the last UUID of return, limit: the number of hosts returned.
+ # with_servers=True
+ host_list = self.nova.hypervisors.list(detailed=True)
+
+ for hv in host_list:
+ if hv.service['host'] in _hosts.keys():
+ if hv.status != "enabled" or hv.state != "up":
+ del _hosts[hv.service['host']]
+ else:
+ host = _hosts[hv.service['host']]
+
+ host.uuid = hv.id
+
+ host.status = hv.status
+ host.state = hv.state
+ host.original_vCPUs = float(hv.vcpus)
+ host.vCPUs_used = float(hv.vcpus_used)
+ host.original_mem_cap = float(hv.memory_mb)
+ host.free_mem_mb = float(hv.free_ram_mb)
+ host.original_local_disk_cap = float(hv.local_gb)
+ host.free_disk_gb = float(hv.free_disk_gb)
+ host.disk_available_least = float(hv.disk_available_least)
+
+ # TODO: cpu_info:topology:sockets
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting host resources from Nova"
+
+ return "ok"
+
+ def get_servers_in_hosts(self, _hosts):
+ """Set servers in hosts."""
+
+ (status, server_list) = self.get_server_detail()
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ for s in server_list:
+ s_info = {}
+
+ if "stack-id" in s.metadata.keys():
+ s_info["stack_id"] = s.metadata["stack-id"]
+ else:
+ s_info["stack_id"] = "none"
+ s_info["stack_name"] = "none"
+
+ s_info["uuid"] = s.id
+
+ s_info["orch_id"] = "none"
+ s_info["name"] = s.name
+
+ s_info["flavor_id"] = s.flavor["id"]
+
+ if "vcpus" in s.flavor.keys():
+ s_info["vcpus"] = s.flavor["vcpus"]
+ s_info["mem"] = s.flavor["ram"]
+ s_info["disk"] = s.flavor["disk"]
+ s_info["disk"] += s.flavor["ephemeral"]
+ s_info["disk"] += s.flavor["swap"] / float(1024)
+ else:
+ s_info["vcpus"] = -1
+ s_info["mem"] = -1
+ s_info["disk"] = -1
+
+ s_info["numa"] = "none"
+
+ try:
+ s_info["image_id"] = s.image["id"]
+ except TypeError:
+ self.logger.warning("In get_servers_in_hosts, expected s.image to have id tag, but it's actually " + s.image)
+ s_info["image_id"] = s.image
+
+ s_info["tenant_id"] = s.tenant_id
+
+ s_info["state"] = "created"
+ s_info["status"] = "valid"
+
+ s_info["host"] = s.__getattr__("OS-EXT-SRV-ATTR:host")
+
+ # s_info["power_state"] = s.__getattr__("OS-EXT-STS:power_state")
+ # s_info["vm_state"] = s.__getattr__("OS-EXT-STS:vm_state")
+ # s_info["task_state"] = s.__getattr__("OS-EXT-STS:task_state")
+
+ if s_info["host"] in _hosts.keys():
+ host = _hosts[s_info["host"]]
+ host.server_list.append(s_info)
+
+ return "ok"
+
+ def get_server_detail(self, project_id=None, host_name=None, server_name=None, uuid=None):
+ """Get the detail of server with search by option."""
+
+ # TODO: Get servers' info in each host
+ # Minimum requirement for server info: s["metadata"]["stack-id"],
+ # More: s["flavor"]["id"], s["tenant_id"]
+ # Maybe: s["image"], server.__getattr__("OS-EXT-AZ:availability_zone"), s["status"]
+ # and scheduler_hints?
+ try:
+ options = {"all_tenants": 1}
+ if project_id is not None:
+ options["project_id"] = project_id
+ if host_name is not None:
+ options["host"] = host_name
+ if server_name is not None:
+ options["name"] = server_name
+ if uuid is not None:
+ options["uuid"] = uuid
+
+ # TODO: search by vm_state?
+
+ if len(options) > 0:
+ server_list = self.nova.servers.list(detailed=True, search_opts=options)
+ else:
+ server_list = self.nova.servers.list(detailed=True)
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while getting server detail from nova", None
+
+ return "ok", server_list
+
+ def get_flavors(self, _flavors, detailed=True):
+ """Get flavors."""
+
+ if detailed:
+ result_status = self._get_flavors(_flavors, True)
+ else:
+ result_status = self._get_flavors(_flavors, False)
+
+ if result_status != "ok":
+ self.logger.error(result_status)
+
+ return result_status
+
+ def _get_flavors(self, _flavors, _detailed):
+ """Get a list of all flavors."""
+
+ try:
+ flavor_list = self.nova.flavors.list(detailed=_detailed)
+
+ for f in flavor_list:
+ flavor = self._set_flavor(f, _detailed)
+ _flavors[flavor.name] = flavor
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while getting flavors"
+
+ # To get non-public flavors.
+ try:
+ flavor_list = self.nova.flavors.list(detailed=_detailed, is_public=False)
+
+ for f in flavor_list:
+ if f.name not in _flavors.keys():
+ flavor = self._set_flavor(f, _detailed)
+ _flavors[flavor.name] = flavor
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while getting flavors"
+
+ return "ok"
+
+ def get_flavor(self, _flavor_id):
+ """Get the flavor."""
+
+ try:
+ f = self.nova.flavors.get(_flavor_id)
+ flavor = self._set_flavor(f, True)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return None
+
+ return flavor
+
+ def _set_flavor(self, _f, _detailed):
+ """Set flavor with detailed infomation."""
+
+ flavor = Flavor(_f.name)
+
+ flavor.flavor_id = _f.id
+
+ if _detailed:
+ # NOTE(Gueyoung): This is not allowed with current credential.
+ # if getattr(_f, "OS-FLV-DISABLED:disabled"):
+ # flavor.status = "disabled"
+
+ flavor.vCPUs = float(_f.vcpus)
+ flavor.mem_cap = float(_f.ram)
+
+ root_gb = float(_f.disk)
+ ephemeral_gb = 0.0
+ if hasattr(_f, "OS-FLV-EXT-DATA:ephemeral"):
+ ephemeral_gb = float(getattr(_f, "OS-FLV-EXT-DATA:ephemeral"))
+ swap_mb = 0.0
+ if hasattr(_f, "swap"):
+ sw = getattr(_f, "swap")
+ if sw != '':
+ swap_mb = float(sw)
+ flavor.disk_cap = root_gb + ephemeral_gb + swap_mb / float(1024)
+
+ extra_specs = _f.get_keys()
+ for sk, sv in extra_specs.iteritems():
+ flavor.extra_specs[sk] = sv
+
+ return flavor
diff --git a/engine/src/valet/engine/resource_manager/resource.py b/engine/src/valet/engine/resource_manager/resource.py new file mode 100644 index 0000000..0f2b550 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resource.py @@ -0,0 +1,1589 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import json +import six +import time + +from valet.engine.app_manager.group import LEVEL +from valet.engine.resource_manager.resources.datacenter import Datacenter +from valet.engine.resource_manager.resources.flavor import Flavor +from valet.engine.resource_manager.resources.group import Group +from valet.engine.resource_manager.resources.host import Host +from valet.engine.resource_manager.resources.host_group import HostGroup +from valet.engine.resource_manager.resources.numa import NUMA + + +class Resource(object): + """Container for resource status of a datacenter and all metadata.""" + + def __init__(self, _datacenter, _dbh, _compute, _metadata, _topology, _logger): + self.dbh = _dbh + + self.compute = _compute + self.metadata = _metadata + self.topology = _topology + + self.group_rules = {} + + self.datacenter = None + self.datacenter_id = _datacenter.get("id") + self.datacenter_url = _datacenter.get("url", "none") + + self.host_groups = {} + self.hosts = {} + + self.change_of_placements = {} + + self.groups = {} + self.flavors = {} + + self.CPU_avail = 0 + self.mem_avail = 0 + self.local_disk_avail = 0 + + self.default_cpu_allocation_ratio = 1.0 + self.default_ram_allocation_ratio = 1.0 + self.default_disk_allocation_ratio = 1.0 + + self.new = False + + # To keep unconfirmed requests. + # If exist, do NOT sync with platform for the next request. + self.pending_requests = [] + + self.logger = _logger + + def set_config(self, _cpu_ratio, _ram_ratio, _disk_ratio): + self.default_cpu_allocation_ratio = _cpu_ratio + self.default_ram_allocation_ratio = _ram_ratio + self.default_disk_allocation_ratio = _disk_ratio + + def set_group_rules(self, _rules): + self.group_rules = _rules + + def load_resource_from_db(self): + """Load datacenter's resource info from DB. + + Note: all resources in DB are enabled ones. + """ + + self.logger.info("load datacenter resource info from DB") + + # Load Valet groups first. + valet_group_list = self.dbh.get_valet_groups() + if valet_group_list is None: + return None + + valet_groups = {} + for vg in valet_group_list: + vgk = vg.get("id") + dc_id = vgk.split(':', 1) + + if dc_id[0] == self.datacenter_id: + if vg["rule_id"] in self.group_rules.keys(): + vg["metadata"] = json.loads(vg["metadata"]) + vg["server_list"] = json.loads(vg["server_list"]) + vg["member_hosts"] = json.loads(vg["member_hosts"]) + vg["group_type"] = vg["type"] + + valet_groups[vgk] = vg + + self._load_groups(valet_groups) + + dcr = self.dbh.get_resource(self.datacenter_id) + if dcr is None: + return None + + if len(dcr) == 0: + return "no resource found for datacenter = " + self.datacenter_id + + if self.datacenter_url == "none": + self.datacenter_url = dcr["url"] + + pending_requests = json.loads(dcr["requests"]) + for req in pending_requests: + self.pending_requests.append(req) + + resource = json.loads(dcr["resource"]) + + groups = resource.get("groups") + if groups: + self._load_groups(groups) + + flavors = resource.get("flavors") + if flavors: + self._load_flavors(flavors) + + if len(self.flavors) == 0: + self.logger.warning("no flavors in db record") + + hosts = resource.get("hosts") + if hosts: + self._load_hosts(hosts) + + if len(self.hosts) == 0: + self.logger.warning("no hosts in db record") + + host_groups = resource.get("host_groups") + if host_groups: + self._load_host_groups(host_groups) + + if len(self.host_groups) == 0: + self.logger.warning("no host_groups (rack)") + + dc = resource.get("datacenter") + self._load_datacenter(dc) + + for ck in dc.get("children"): + if ck in self.host_groups.keys(): + self.datacenter.resources[ck] = self.host_groups[ck] + elif ck in self.hosts.keys(): + self.datacenter.resources[ck] = self.hosts[ck] + + hgs = resource.get("host_groups") + if hgs: + for hgk, hg in hgs.iteritems(): + host_group = self.host_groups[hgk] + + pk = hg.get("parent") + if pk == self.datacenter.name: + host_group.parent_resource = self.datacenter + elif pk in self.host_groups.keys(): + host_group.parent_resource = self.host_groups[pk] + + for ck in hg.get("children"): + if ck in self.hosts.keys(): + host_group.child_resources[ck] = self.hosts[ck] + elif ck in self.host_groups.keys(): + host_group.child_resources[ck] = self.host_groups[ck] + + hs = resource.get("hosts") + if hs: + for hk, h in hs.iteritems(): + host = self.hosts[hk] + + pk = h.get("parent") + if pk == self.datacenter.name: + host.host_group = self.datacenter + elif pk in self.host_groups.keys(): + host.host_group = self.host_groups[pk] + + for _, g in self.groups.iteritems(): + for hk in g.member_hosts.keys(): + if hk not in self.hosts.keys() and \ + hk not in self.host_groups.keys(): + del g.member_hosts[hk] + + self._update_compute_avail() + + return "ok" + + def _load_groups(self, _groups): + """Take JSON group data as defined in /resources/group and + create Group instance. + """ + + for gk, g in _groups.iteritems(): + group = Group(gk) + + group.status = g.get("status") + + group.uuid = g.get("uuid") + + group.group_type = g.get("group_type") + group.level = g.get("level") + group.factory = g.get("factory") + + rule_id = g.get("rule_id") + if rule_id != "none" and rule_id in self.group_rules.keys(): + group.rule = self.group_rules[rule_id] + + for mk, mv in g["metadata"].iteritems(): + group.metadata[mk] = mv + + for s_info in g["server_list"]: + group.server_list.append(s_info) + + for hk, server_list in g["member_hosts"].iteritems(): + group.member_hosts[hk] = [] + for s_info in server_list: + group.member_hosts[hk].append(s_info) + + self.groups[gk] = group + + def _load_flavors(self, _flavors): + """Take JSON flavor data as defined in /resources/flavor and + create Flavor instance. + """ + + for fk, f in _flavors.iteritems(): + flavor = Flavor(fk) + + flavor.status = f.get("status") + + flavor.flavor_id = f.get("flavor_id") + flavor.vCPUs = f.get("vCPUs") + flavor.mem_cap = f.get("mem") + flavor.disk_cap = f.get("disk") + for k, v in f["extra_specs"].iteritems(): + flavor.extra_specs[k] = v + + self.flavors[fk] = flavor + + def _load_hosts(self, _hosts): + """Take JSON host data as defined in /resources/host and + create Host instance. + """ + + for hk, h in _hosts.iteritems(): + host = Host(hk) + + host.status = h.get("status") + host.state = h.get("state") + + host.uuid = h.get("uuid") + + host.vCPUs = h.get("vCPUs") + host.original_vCPUs = h.get("original_vCPUs") + host.vCPUs_used = h.get("vCPUs_used") + host.avail_vCPUs = h.get("avail_vCPUs") + + host.mem_cap = h.get("mem") + host.original_mem_cap = h.get("original_mem") + host.free_mem_mb = h.get("free_mem_mb") + host.avail_mem_cap = h.get("avail_mem") + + host.local_disk_cap = h.get("local_disk") + host.original_local_disk_cap = h.get("original_local_disk") + host.free_disk_gb = h.get("free_disk_gb") + host.disk_available_least = h.get("disk_available_least") + host.avail_local_disk_cap = h.get("avail_local_disk") + + host.NUMA = NUMA(numa=h.get("NUMA")) + + for s_info in h["server_list"]: + host.server_list.append(s_info) + + for gk in h["membership_list"]: + if gk in self.groups.keys(): + host.memberships[gk] = self.groups[gk] + + # Not used by Valet currently, only capacity planning module + if "candidate_host_types" in h.keys(): + for htk, ht in h["candidate_host_types"].iteritems(): + host.candidate_host_types[htk] = ht + else: + host.candidate_host_types = {} + + self.hosts[hk] = host + + def _load_host_groups(self, _host_groups): + for hgk, hg in _host_groups.iteritems(): + host_group = HostGroup(hgk) + + host_group.status = hg.get("status") + + host_group.host_type = hg.get("host_type") + + host_group.vCPUs = hg.get("vCPUs") + host_group.avail_vCPUs = hg.get("avail_vCPUs") + + host_group.mem_cap = hg.get("mem") + host_group.avail_mem_cap = hg.get("avail_mem") + + host_group.local_disk_cap = hg.get("local_disk") + host_group.avail_local_disk_cap = hg.get("avail_local_disk") + + for s_info in hg["server_list"]: + host_group.server_list.append(s_info) + + for gk in hg.get("membership_list"): + if gk in self.groups.keys(): + host_group.memberships[gk] = self.groups[gk] + + self.host_groups[hgk] = host_group + + def _load_datacenter(self, _dc): + self.datacenter = Datacenter(_dc.get("name")) + + self.datacenter.status = _dc.get("status") + + self.datacenter.vCPUs = _dc.get("vCPUs") + self.datacenter.avail_vCPUs = _dc.get("avail_vCPUs") + + self.datacenter.mem_cap = _dc.get("mem") + self.datacenter.avail_mem_cap = _dc.get("avail_mem") + + self.datacenter.local_disk_cap = _dc.get("local_disk") + self.datacenter.avail_local_disk_cap = _dc.get("avail_local_disk") + + for s_info in _dc["server_list"]: + self.datacenter.server_list.append(s_info) + + for gk in _dc.get("membership_list"): + if gk in self.groups.keys(): + self.datacenter.memberships[gk] = self.groups[gk] + + def _update_compute_avail(self): + """Update amount of total available resources.""" + + self.CPU_avail = self.datacenter.avail_vCPUs + self.mem_avail = self.datacenter.avail_mem_cap + self.local_disk_avail = self.datacenter.avail_local_disk_cap + + def update_resource(self): + """Update resource status triggered by placements, events, and batch.""" + + for level in LEVEL: + for _, host_group in self.host_groups.iteritems(): + if host_group.host_type == level: + if host_group.is_available() and host_group.updated: + self._update_host_group(host_group) + + if self.datacenter.updated: + self._update_datacenter() + + self._update_compute_avail() + + def _update_host_group(self, _host_group): + """Update host group (rack) status.""" + + _host_group.init_resources() + del _host_group.server_list[:] + _host_group.init_memberships() + + for _, host in _host_group.child_resources.iteritems(): + if host.is_available(): + _host_group.vCPUs += host.vCPUs + _host_group.avail_vCPUs += host.avail_vCPUs + _host_group.mem_cap += host.mem_cap + _host_group.avail_mem_cap += host.avail_mem_cap + _host_group.local_disk_cap += host.local_disk_cap + _host_group.avail_local_disk_cap += host.avail_local_disk_cap + + for server_info in host.server_list: + _host_group.server_list.append(server_info) + + for gk in host.memberships.keys(): + _host_group.memberships[gk] = host.memberships[gk] + + def _update_datacenter(self): + """Update datacenter status.""" + + self.datacenter.init_resources() + del self.datacenter.server_list[:] + self.datacenter.memberships.clear() + + for _, resource in self.datacenter.resources.iteritems(): + if resource.is_available(): + self.datacenter.vCPUs += resource.vCPUs + self.datacenter.avail_vCPUs += resource.avail_vCPUs + self.datacenter.mem_cap += resource.mem_cap + self.datacenter.avail_mem_cap += resource.avail_mem_cap + self.datacenter.local_disk_cap += resource.local_disk_cap + self.datacenter.avail_local_disk_cap += resource.avail_local_disk_cap + + for s in resource.server_list: + self.datacenter.server_list.append(s) + + for gk in resource.memberships.keys(): + self.datacenter.memberships[gk] = resource.memberships[gk] + + def compute_resources(self, host): + """Compute the amount of resources with oversubsription ratios.""" + + ram_allocation_ratio_list = [] + cpu_allocation_ratio_list = [] + disk_allocation_ratio_list = [] + + for _, g in host.memberships.iteritems(): + if g.group_type == "aggr": + if g.name.startswith("valet:"): + metadata = g.metadata["prior_metadata"] + else: + metadata = g.metadata + + if "ram_allocation_ratio" in metadata.keys(): + if isinstance(metadata["ram_allocation_ratio"], list): + for r in metadata["ram_allocation_ratio"]: + ram_allocation_ratio_list.append(float(r)) + else: + ram_allocation_ratio_list.append(float(metadata["ram_allocation_ratio"])) + if "cpu_allocation_ratio" in metadata.keys(): + if isinstance(metadata["cpu_allocation_ratio"], list): + for r in metadata["cpu_allocation_ratio"]: + cpu_allocation_ratio_list.append(float(r)) + else: + cpu_allocation_ratio_list.append(float(metadata["cpu_allocation_ratio"])) + if "disk_allocation_ratio" in metadata.keys(): + if isinstance(metadata["disk_allocation_ratio"], list): + for r in metadata["disk_allocation_ratio"]: + disk_allocation_ratio_list.append(float(r)) + else: + disk_allocation_ratio_list.append(float(metadata["disk_allocation_ratio"])) + + ram_allocation_ratio = 1.0 + if len(ram_allocation_ratio_list) > 0: + ram_allocation_ratio = min(ram_allocation_ratio_list) + else: + if self.default_ram_allocation_ratio > 0: + ram_allocation_ratio = self.default_ram_allocation_ratio + + host.compute_mem(ram_allocation_ratio) + + cpu_allocation_ratio = 1.0 + if len(cpu_allocation_ratio_list) > 0: + cpu_allocation_ratio = min(cpu_allocation_ratio_list) + else: + if self.default_cpu_allocation_ratio > 0: + cpu_allocation_ratio = self.default_cpu_allocation_ratio + + host.compute_cpus(cpu_allocation_ratio) + + disk_allocation_ratio = 1.0 + if len(disk_allocation_ratio_list) > 0: + disk_allocation_ratio = min(disk_allocation_ratio_list) + else: + if self.default_disk_allocation_ratio > 0: + disk_allocation_ratio = self.default_disk_allocation_ratio + + host.compute_disk(disk_allocation_ratio) + + def compute_avail_resources(self, host): + """Compute available amount of resources after placements.""" + + status = host.compute_avail_mem() + if status != "ok": + self.logger.warning(status) + + status = host.compute_avail_cpus() + if status != "ok": + self.logger.warning(status) + + status = host.compute_avail_disk() + if status != "ok": + self.logger.warning(status) + + def mark_host_updated(self, _host_name): + """Mark the host updated.""" + + host = self.hosts[_host_name] + host.updated = True + + if host.host_group is not None: + if isinstance(host.host_group, HostGroup): + self.mark_host_group_updated(host.host_group.name) + else: + self.mark_datacenter_updated() + + def mark_host_group_updated(self, _name): + """Mark the host_group updated.""" + + host_group = self.host_groups[_name] + host_group.updated = True + + if host_group.parent_resource is not None: + if isinstance(host_group.parent_resource, HostGroup): + self.mark_host_group_updated(host_group.parent_resource.name) + else: + self.mark_datacenter_updated() + + def mark_datacenter_updated(self): + """Mark the datacenter updated.""" + + if self.datacenter is not None: + self.datacenter.updated = True + + def get_host_of_server(self, _s_info): + """Check and return host that hosts this server.""" + + host = None + + if len(self.change_of_placements) > 0: + if _s_info["stack_id"] != "none": + sid = _s_info["stack_id"] + ":" + _s_info["name"] + else: + sid = _s_info["uuid"] + + if sid in self.change_of_placements.keys(): + host_name = None + if "host" in self.change_of_placements[sid].keys(): + host_name = self.change_of_placements[sid]["host"] + elif "new_host" in self.change_of_placements[sid].keys(): + host_name = self.change_of_placements[sid]["new_host"] + + if host_name is not None: + host = self.hosts[host_name] + else: + for _, h in self.hosts.iteritems(): + if h.has_server(_s_info): + host = h + break + + return host + + def update_server_placements(self, change_of_placements=None, sync=False): + """Update hosts with the change of server placements. + + Update the available resources of host and NUMA if sync is True. + """ + + if change_of_placements is None: + change_of_placements = self.change_of_placements + + for _, change in change_of_placements.iteritems(): + if "new_host" in change and "old_host" in change: + # Migration case + + old_host = self.hosts[change.get("old_host")] + new_host = self.hosts[change.get("new_host")] + + s_info = change.get("info") + old_info = old_host.get_server_info(s_info) + + if sync: + # Adjust available remaining amount. + + old_flavor = self.get_flavor(old_info.get("flavor_id")) + new_flavor = self.get_flavor(s_info.get("flavor_id")) + + if new_flavor is None or old_flavor is None: + # NOTE(Gueyoung): ignore at this time. + # return False + pass + else: + s_info["vcpus"] = new_flavor.vCPUs + s_info["mem"] = new_flavor.mem_cap + s_info["disk"] = new_flavor.disk_cap + + new_host.deduct_avail_resources(s_info) + + if new_flavor.need_numa_alignment(): + cell = new_host.NUMA.deduct_server_resources(s_info) + s_info["numa"] = cell + + old_info["vcpus"] = old_flavor.vCPUs + old_info["mem"] = old_flavor.mem_cap + old_info["disk"] = old_flavor.disk_cap + + old_host.rollback_avail_resources(old_info) + + if old_flavor.need_numa_alignment(): + old_host.NUMA.rollback_server_resources(old_info) + + old_host.remove_server(old_info) + + new_host.add_server(old_info) + new_host.update_server(s_info) + + self.mark_host_updated(change.get("new_host")) + self.mark_host_updated(change.get("old_host")) + + elif "new_host" in change and "old_host" not in change: + # New server case + + host = self.hosts[change.get("new_host")] + s_info = change.get("info") + + flavor = self.get_flavor(s_info.get("flavor_id")) + + if flavor is None: + # NOTE(Gueyoung): ignore at this time. + # return False + pass + else: + s_info["vcpus"] = flavor.vCPUs + s_info["mem"] = flavor.mem_cap + s_info["disk"] = flavor.disk_cap + + host.deduct_avail_resources(s_info) + + host.add_server(s_info) + + if sync: + if flavor is not None: + # Adjust available remaining amount. + if flavor.need_numa_alignment(): + host.NUMA.deduct_server_resources(s_info) + else: + if s_info.get("numa") != "none": + host.NUMA.add_server(s_info) + + self.mark_host_updated(change.get("new_host")) + + elif "new_host" not in change and "old_host" in change: + # Deletion case + + host = self.hosts[change.get("old_host")] + s_info = change.get("info") + + flavor = self.get_flavor(s_info.get("flavor_id")) + + if flavor is None: + # NOTE(Gueyoung): ignore at this time. + # return False + pass + else: + s_info["vcpus"] = flavor.vCPUs + s_info["mem"] = flavor.mem_cap + s_info["disk"] = flavor.disk_cap + + host.rollback_avail_resources(s_info) + + if flavor.need_numa_alignment(): + host.NUMA.rollback_server_resources(s_info) + + host.remove_server(s_info) + + self.mark_host_updated(change.get("old_host")) + + else: + # Update case + + host = self.hosts[change.get("host")] + s_info = change.get("info") + + if sync: + # Adjust available remaining amount. + + old_info = host.get_server_info(s_info) + + if s_info["flavor_id"] != old_info["flavor_id"]: + old_flavor = self.get_flavor(old_info.get("flavor_id")) + new_flavor = self.get_flavor(s_info.get("flavor_id")) + + if old_flavor is None or new_flavor is None: + # NOTE(Gueyoung): ignore at this time. + # return False + pass + else: + host.rollback_avail_resources(old_info) + + if old_flavor.need_numa_alignment(): + host.NUMA.rollback_server_resources(old_info) + + s_info["vcpus"] = new_flavor.vCPUs + s_info["mem"] = new_flavor.mem_cap + s_info["disk"] = new_flavor.disk_cap + + host.deduct_avail_resources(s_info) + + if new_flavor.need_numa_alignment(): + cell = host.NUMA.deduct_server_resources(s_info) + s_info["numa"] = cell + + new_info = host.update_server(s_info) + + if new_info is not None: + self.mark_host_updated(change.get("host")) + + return True + + def update_server_grouping(self, change_of_placements=None, new_groups=None): + """Update group member_hosts and hosts' memberships + + Caused by server addition, deletion, and migration. + """ + + if change_of_placements is None: + change_of_placements = self.change_of_placements + + if new_groups is None: + new_groups = self._get_new_grouping() + + for _, placement in change_of_placements.iteritems(): + if "new_host" in placement.keys() and "old_host" in placement.keys(): + # Migrated server. This server can be unknown one previously. + + old_host = self.hosts[placement.get("old_host")] + new_host = self.hosts[placement.get("new_host")] + s_info = placement.get("info") + new_info = new_host.get_server_info(s_info) + + # A list of Valet groups + group_list = [] + self.get_groups_of_server(old_host, new_info, group_list) + + _group_list = self._get_groups_of_server(new_info, new_groups) + for gk in _group_list: + if gk not in group_list: + group_list.append(gk) + + self._remove_server_from_groups(old_host, new_info) + + self._add_server_to_groups(new_host, new_info, group_list) + + elif "new_host" in placement.keys() and "old_host" not in placement.keys(): + # New server + + new_host = self.hosts[placement.get("new_host")] + s_info = placement.get("info") + new_s_info = new_host.get_server_info(s_info) + + group_list = self._get_groups_of_server(new_s_info, new_groups) + + self._add_server_to_groups(new_host, new_s_info, group_list) + + elif "new_host" not in placement.keys() and "old_host" in placement.keys(): + # Deleted server. This server can be unknown one previously. + + # Enabled host + host = self.hosts[placement["old_host"]] + + self._remove_server_from_groups(host, placement.get("info")) + + else: + host_name = placement.get("host") + s_info = placement.get("info") + + if host_name in self.hosts.keys(): + host = self.hosts[host_name] + new_info = host.get_server_info(s_info) + + if new_info is not None: + self._update_server_in_groups(host, new_info) + + # To create, delete, and update dynamic Host-Aggregates. + # TODO(Gueyoung): return error if fail to connect to Nova. + self._manage_dynamic_host_aggregates() + + def _get_new_grouping(self, change_of_placements=None): + """Verify and get new hosts' memberships.""" + + if change_of_placements is None: + change_of_placements = self.change_of_placements + + new_groups = {} + + # TODO: grouping verification for 'new' servers. + # by calling verify_pre_valet_placements() + # Should add each host's new memberships. + + # Add host's memberships for server-group. + # Do not need to verify. + for _, placement in change_of_placements.iteritems(): + if "new_host" in placement.keys(): + host = self.hosts[placement.get("new_host")] + s_info = placement.get("info") + new_info = host.get_server_info(s_info) + + for gk, g in self.groups.iteritems(): + if g.factory == "server-group" and g.status == "enabled": + if g.has_server_uuid(new_info.get("uuid")): + if gk not in host.memberships.keys(): + host.memberships[gk] = g + self.mark_host_updated(host.name) + + if gk not in new_groups.keys(): + new_groups[gk] = [] + new_groups[gk].append(new_info) + + return new_groups + + def _get_groups_of_server(self, _s_info, new_groups): + """Check and return group list where server belongs to.""" + + group_list = [] + + _stack_id = _s_info.get("stack_id") + _stack_name = _s_info.get("stack_name") + _uuid = _s_info.get("uuid") + _name = _s_info.get("name") + + for gk, server_list in new_groups.iteritems(): + for s_info in server_list: + if s_info["uuid"] != "none": + if s_info["uuid"] == _uuid: + if gk not in group_list: + group_list.append(gk) + break + + if s_info["name"] != "none": + if s_info["stack_id"] != "none": + if s_info["stack_id"] == _stack_id and \ + s_info["name"] == _name: + if gk not in group_list: + group_list.append(gk) + break + + if s_info["stack_name"] != "none": + if s_info["stack_name"] == _stack_name and \ + s_info["name"] == _name: + if gk not in group_list: + group_list.append(gk) + break + + return group_list + + def get_groups_of_server(self, _host, _s_info, _group_list): + """Get groups where the server is assigned.""" + + for gk in _host.memberships.keys(): + if gk not in self.groups.keys() or self.groups[gk].status != "enabled": + del _host.memberships[gk] + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + continue + + g = self.groups[gk] + + if g.factory not in ("valet", "server-group"): + continue + + if isinstance(_host, HostGroup): + if g.level != _host.host_type: + continue + + if g.has_server_in_host(_host.name, _s_info): + if gk not in _group_list: + _group_list.append(gk) + + if isinstance(_host, Host) and _host.host_group is not None: + if _host.host_group.is_available(): + self.get_groups_of_server(_host.host_group, _s_info, _group_list) + elif isinstance(_host, HostGroup) and _host.parent_resource is not None: + if _host.parent_resource.is_available(): + if isinstance(_host.parent_resource, HostGroup): + self.get_groups_of_server(_host.parent_resource, _s_info, _group_list) + + def _add_server_to_groups(self, _host, _s_info, _groups): + """Add new server into groups.""" + + for gk in _groups: + # The group must be verified for host membership + if gk not in _host.memberships.keys(): + continue + + if gk not in self.groups.keys() or self.groups[gk].status != "enabled": + del _host.memberships[gk] + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + continue + + g = self.groups[gk] + + if g.factory not in ("valet", "server-group"): + continue + + if isinstance(_host, HostGroup): + if g.level != _host.host_type: + continue + + if g.factory == "server-group": + g.clean_server(_s_info["uuid"], _host.name) + + if g.add_server(_s_info, _host.name): + g.updated = True + else: + self.logger.warning("server already exists in group") + + if isinstance(_host, Host) and _host.host_group is not None: + if _host.host_group.is_available(): + self._add_server_to_groups(_host.host_group, _s_info, _groups) + elif isinstance(_host, HostGroup) and _host.parent_resource is not None: + if _host.parent_resource.is_available(): + if isinstance(_host.parent_resource, HostGroup): + self._add_server_to_groups(_host.parent_resource, _s_info, _groups) + + def _remove_server_from_groups(self, _host, _s_info): + """Remove server from related groups.""" + + for gk in _host.memberships.keys(): + if gk not in self.groups.keys() or self.groups[gk].status != "enabled": + del _host.memberships[gk] + + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + continue + + g = self.groups[gk] + + if g.factory not in ("valet", "server-group"): + continue + + if isinstance(_host, HostGroup): + if g.level != _host.host_type: + continue + + if g.remove_server(_s_info): + g.updated = True + + if g.remove_server_from_host(_host.name, _s_info): + g.updated = True + + # Remove host from group's membership if the host has no servers of the group. + if g.remove_member(_host.name): + g.updated = True + + # Remove group from host's membership if group does not have the host + # Not consider group has datacenter level. + if isinstance(_host, Host) or isinstance(_host, HostGroup): + if _host.remove_membership(g): + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + + if len(g.server_list) == 0: + g.status = "disabled" + g.updated = True + + if isinstance(_host, Host) and _host.host_group is not None: + if _host.host_group.is_available(): + self._remove_server_from_groups(_host.host_group, _s_info) + elif isinstance(_host, HostGroup) and _host.parent_resource is not None: + if _host.parent_resource.is_available(): + if isinstance(_host.parent_resource, HostGroup): + self._remove_server_from_groups(_host.parent_resource, _s_info) + + def _update_server_in_groups(self, _host, _s_info): + """Update server info in groups.""" + + for gk in _host.memberships.keys(): + if gk not in self.groups.keys() or self.groups[gk].status != "enabled": + del _host.memberships[gk] + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + continue + + g = self.groups[gk] + + if g.factory not in ("valet", "server-group"): + continue + + if isinstance(_host, HostGroup): + if g.level != _host.host_type: + continue + + if g.update_server(_s_info): + g.update_server_in_host(_host.name, _s_info) + g.updated = True + + if isinstance(_host, Host) and _host.host_group is not None: + if _host.host_group.is_available(): + self._update_server_in_groups(_host.host_group, _s_info) + elif isinstance(_host, HostGroup) and _host.parent_resource is not None: + if _host.parent_resource.is_available(): + if isinstance(_host.parent_resource, HostGroup): + self._update_server_in_groups(_host.parent_resource, _s_info) + + def add_group(self, _g_name, _g_type, _level, _factory, _host_name): + """Add/Enable group unless the group exists or disabled.""" + + if _g_name not in self.groups.keys(): + group = Group(_g_name) + group.group_type = _g_type + group.factory = _factory + group.level = _level + group.rule = self._get_rule_of_group(_g_name) + group.new = True + group.updated = True + self.groups[_g_name] = group + elif self.groups[_g_name].status != "enabled": + self.groups[_g_name].status = "enabled" + self.groups[_g_name].updated = True + + if _host_name in self.hosts.keys(): + host = self.hosts[_host_name] + else: + host = self.host_groups[_host_name] + + # Update host memberships. + if host is not None: + if _g_name not in host.memberships.keys(): + host.memberships[_g_name] = self.groups[_g_name] + + if isinstance(host, Host): + self.mark_host_updated(_host_name) + elif isinstance(host, HostGroup): + self.mark_host_group_updated(_host_name) + + return True + + def _get_rule_of_group(self, _gk): + """Get valet group rule of the given group.""" + + rule_name_elements = _gk.split(':') + rule_name = rule_name_elements[len(rule_name_elements)-1] + + if rule_name in self.group_rules.keys(): + return self.group_rules[rule_name] + + return None + + def get_group_by_uuid(self, _uuid): + """Check and get the group with its uuid.""" + + for _, g in self.groups.iteritems(): + if g.uuid == _uuid: + return g + + return None + + def check_valid_rules(self, _tenant_id, _rule_list, use_ex=True): + """Check if given rules are valid to be used.""" + + for rk in _rule_list: + if rk not in self.group_rules.keys(): + return "not exist rule (" + rk + ")" + + # TODO(Gueyoung): if disabled, + # what to do with placed servers under this rule? + if self.group_rules[rk].status != "enabled": + return "rule (" + rk + ") is not enabled" + + if not use_ex: + if self.group_rules[rk].rule_type == "exclusivity": + return "exclusivity not supported" + + rule = self.group_rules[rk] + if len(rule.members) > 0 and _tenant_id not in rule.members: + return "no valid tenant to use rule (" + rk + ")" + + return "ok" + + def _manage_dynamic_host_aggregates(self): + """Create, delete, or update Host-Aggregates after placement decisions.""" + + for gk in self.groups.keys(): + g = self.groups[gk] + if g.group_type == "exclusivity" and g.status == "enabled": + aggr_name = "valet:" + g.name + if aggr_name not in self.groups.keys(): + # Create Host-Aggregate. + status = self._add_exclusivity_aggregate(aggr_name, g) + # TODO(Gueyoung): return error + if status != "ok": + self.logger.warning("error while adding dynamic host-aggregate") + else: + dha = self.groups[aggr_name] + for hk in g.member_hosts.keys(): + if hk not in dha.member_hosts.keys(): + # Add new host into Host-Aggregate. + status = self._update_exclusivity_aggregate(dha, + self.hosts[hk]) + # TODO(Gueyoung): return error + if status != "ok": + self.logger.warning("error while updating dynamic host-aggregate") + + for gk in self.groups.keys(): + g = self.groups[gk] + if g.group_type == "aggr" and g.status == "enabled": + if g.name.startswith("valet:"): + if g.metadata["valet_type"] == "exclusivity": + name_elements = g.name.split(':', 1) + ex_group_name = name_elements[1] + if ex_group_name not in self.groups.keys() or \ + self.groups[ex_group_name].status != "enabled": + # Delete Host-Aggregate + status = self._remove_exclusivity_aggregate(g) + # TODO(Gueyoung): return error + if status != "ok": + self.logger.warning("error while removing dynamic host-aggregate") + else: + ex_group = self.groups[ex_group_name] + for hk in g.member_hosts.keys(): + if hk not in ex_group.member_hosts.keys(): + # Remove host from Host-Aggregate. + status = self._remove_host_from_exclusivity_aggregate(g, + self.hosts[hk]) + + # TODO(Gueyoung): return error + if status != "ok": + self.logger.warning("error while removing host from dynamic host-aggregate") + + def _add_exclusivity_aggregate(self, _name, _group): + """Create platform Host-Aggregate for Valet rules. + + Exclusivity: create Host-Aggregate, and lock. + """ + + group = Group(_name) + group.group_type = "aggr" + group.level = "host" + group.factory = "nova" + + metadata = {"valet_type": "exclusivity"} + + new_host_list = [] + ex_metadata = {} + + for hk in _group.member_hosts.keys(): + host = self.hosts[hk] + aggregates = host.get_aggregates() + + old_aggregates = [] + for a in aggregates: + if a.name.startswith("valet:"): + continue + + for mk, mv in a.metadata.iteritems(): + if mk not in ex_metadata.keys(): + ex_metadata[mk] = mv + else: + if isinstance(ex_metadata[mk], list): + if mv not in ex_metadata[mk]: + ex_metadata[mk].append(mv) + self.logger.warning("multiple values of metadata key") + else: + if mv != ex_metadata[mk]: + value_list = [ex_metadata[mk], mv] + ex_metadata[mk] = value_list + self.logger.warning("multiple values of metadata key") + + old_aggregates.append(a) + + if hk in a.member_hosts.keys(): + del a.member_hosts[hk] + a.updated = True + + if a.name in host.memberships.keys(): + del host.memberships[a.name] + + if len(old_aggregates) > 0: + metadata[hk] = str(old_aggregates[0].uuid) + for i in range(1, len(old_aggregates)): + metadata[hk] += ("," + str(old_aggregates[i].uuid)) + + new_host_list.append(host) + + metadata["prior_metadata"] = ex_metadata + + group.metadata = metadata + + for host in new_host_list: + group.member_hosts[host.name] = [] + + host.memberships[_name] = group + self.mark_host_updated(host.name) + + group.updated = True + + if not self.metadata.source.valid_client(self.datacenter_url): + self.metadata.source.set_client(self.datacenter_url) + + status = self.metadata.create_exclusive_aggregate(group, + new_host_list) + + self.groups[_name] = group + + return status + + def _update_exclusivity_aggregate(self, _group, _host): + """Update platform Host-Aggregate for Valet rules. + + Exclusivity: update Host-Aggregate, and lock. + """ + + status = "ok" + + aggregates = _host.get_aggregates() + + if _group.group_type == "aggr": + if _host.name not in _group.member_hosts.keys(): + old_aggregates = [] + ex_metadata = _group.metadata["prior_metadata"] + + for a in aggregates: + if a.name.startswith("valet:"): + continue + + for mk, mv in a.metadata.iteritems(): + if mk not in ex_metadata.keys(): + ex_metadata[mk] = mv + else: + if isinstance(ex_metadata[mk], list): + if mv not in ex_metadata[mk]: + ex_metadata[mk].append(mv) + self.logger.warning("multiple values of metadata key") + else: + if mv != ex_metadata[mk]: + value_list = [ex_metadata[mk], mv] + ex_metadata[mk] = value_list + self.logger.warning("multiple values of metadata key") + + old_aggregates.append(a) + + if _host.name in a.member_hosts.keys(): + del a.member_hosts[_host.name] + a.updated = True + + if a.name in _host.memberships.keys(): + del _host.memberships[a.name] + + if len(old_aggregates) > 0: + _group.metadata[_host.name] = str(old_aggregates[0].uuid) + for i in range(1, len(old_aggregates)): + _group.metadata[_host.name] += ("," + str(old_aggregates[i].uuid)) + + _group.metadata["prior_metadata"] = ex_metadata + + _group.member_hosts[_host.name] = [] + _group.updated = True + + _host.memberships[_group.name] = _group + self.mark_host_updated(_host.name) + + if not self.metadata.source.valid_client(self.datacenter_url): + self.metadata.source.set_client(self.datacenter_url) + + status = self.metadata.update_exclusive_aggregate(_group.uuid, + _group.metadata, + _host.name, + old_aggregates) + + return status + + def _remove_exclusivity_aggregate(self, _group): + """Remove dynamic Host-Aggregate.""" + + for hk in _group.member_hosts.keys(): + host = self.hosts[hk] + + status = self._remove_host_from_exclusivity_aggregate(_group, host) + if status != "ok": + self.logger.warning("error while removing host from dynamic host-aggregate") + + del self.groups[_group.name] + + if not self.metadata.source.valid_client(self.datacenter_url): + self.metadata.source.set_client(self.datacenter_url) + + return self.metadata.remove_exclusive_aggregate(_group.uuid) + + def _remove_host_from_exclusivity_aggregate(self, _group, _host): + """Update platform Host-Aggregate for Valet rules. + + Exclusivity: delete host from dynamic Host-Aggregate. + """ + + status = "ok" + + if _group.group_type == "aggr": + if _host.name in _group.member_hosts.keys(): + old_aggregates = [] + if _host.name in _group.metadata.keys(): + aggr_ids = _group.metadata[_host.name].split(',') + + for aid in aggr_ids: + aggr = self.get_group_by_uuid(int(aid)) + if aggr is not None: + aggr.member_hosts[_host.name] = [] + aggr.updated = True + old_aggregates.append(aggr) + + if aggr.name not in _host.memberships.keys(): + _host.memberships[aggr.name] = aggr + + _group.metadata[_host.name] = "" + + del _group.member_hosts[_host.name] + _group.updated = True + + del _host.memberships[_group.name] + self.mark_host_updated(_host.name) + + if not self.metadata.source.valid_client(self.datacenter_url): + self.metadata.source.set_client(self.datacenter_url) + + status = self.metadata.remove_host_from_exclusive_aggregate(_group.uuid, + _group.metadata, + _host.name, + old_aggregates) + + return status + + def sync_with_platform(self, store=False): + """Communicate with platform (e.g., nova) to get resource status. + + Due to dependencies between resource types, + keep the following order of process. + """ + + if len(self.pending_requests) > 0: + return True + + self.logger.info("load data from platform (e.g., nova)") + + # Set the platorm client lib (e.g., novaclient). + if not self.metadata.source.valid_client(self.datacenter_url): + count = 0 + while count < 3: + if not self.metadata.source.set_client(self.datacenter_url): + self.logger.warning("fail to set novaclient: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to set novaclient") + return False + + count = 0 + while count < 3: + # Set each flavor and its metadata. + if not self.metadata.get_flavors(self): + self.logger.warning("fail to get flavors: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to get flavors") + return False + + count = 0 + while count < 3: + # Set each compute host and servers information. + if not self.compute.get_hosts(self): + self.logger.warning("fail to get hosts: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to get hosts") + return False + + # TODO(Gueyoung): need to every time? + # Set the layout between each compute host and rack. + if not self.topology.get_topology(self): + return False + + count = 0 + while count < 3: + # Set the availability-zone, host-aggregate, and server-group + # of each compute host. + if not self.metadata.get_groups(self): + self.logger.warning("fail to get groups: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to get groups") + return False + + # Update total capacities of each host. + # Triggered by overcommit ratio update or newly added. + for _, host in self.hosts.iteritems(): + if host.is_available() and host.updated: + self.compute_resources(host) + + # Update server placements in hosts + # If sync is True, update the available capacities. + if not self.update_server_placements(sync=True): + return False + + # Update the available capacities of each NUMA and host. + # Triggered by unknown server additions and deletions. + for _, host in self.hosts.iteritems(): + if host.is_available() and host.updated: + self.compute_avail_resources(host) + + # Update server grouping changed by deletion and migration of servers. + # TODO(Gueyoung): return False if fail to connect to Nova. + self.update_server_grouping() + + # Update racks (and clusters) and datacenter based on host change. + self.update_resource() + + # TODO: If peoridic batches to collect data from platform is activated, + # should check if there is any update before storing data into DB. + if store: + self.store_resource() + + return True + + def get_flavor(self, _id): + """Get a flavor info.""" + + if isinstance(_id, six.string_types): + flavor_id = _id + else: + flavor_id = str(_id) + + self.logger.debug("fetching flavor = " + flavor_id) + + flavor = None + if flavor_id in self.flavors.keys(): + flavor = self.flavors[flavor_id] + else: + for _, f in self.flavors.iteritems(): + if f.flavor_id == flavor_id: + flavor = f + break + + if flavor is not None: + # Check if detailed information. + # TODO(Gueyoung): what if flavor specs changed from platform? + if flavor.vCPUs == 0: + if not self.metadata.source.valid_client(self.datacenter_url): + count = 0 + while count < 3: + if not self.metadata.source.set_client(self.datacenter_url): + self.logger.warning("fail to set novaclient: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to set novaclient") + return None + + f = self.metadata.source.get_flavor(flavor.flavor_id) + if f is None: + flavor = None + else: + flavor.set_info(f) + flavor.updated = True + + self.logger.debug("flavor (" + flavor.name + ") fetched") + else: + self.logger.warning("unknown flavor = " + flavor_id) + + return flavor + + def store_resource(self, opt=None, req_id=None): + """Store resource status into DB.""" + + flavor_updates = {} + group_updates = {} + host_updates = {} + host_group_updates = {} + + # Do not store disbaled resources. + + for fk, flavor in self.flavors.iteritems(): + # TODO(Gueyoung): store disabled flavor? + flavor_updates[fk] = flavor.get_json_info() + + for gk, group in self.groups.iteritems(): + if group.status == "enabled": + if group.factory != "valet": + group_updates[gk] = group.get_json_info() + + for hk, host in self.hosts.iteritems(): + if host.is_available(): + host_updates[hk] = host.get_json_info() + + for hgk, host_group in self.host_groups.iteritems(): + if host_group.is_available(): + host_group_updates[hgk] = host_group.get_json_info() + + datacenter_update = self.datacenter.get_json_info() + + # If there is pending requests (i.e., not confirmed nor rollbacked), + # do NOT sync with platform when dealing with new request. + # Here, add/remove request from/to pending list + # to track the list of pending requests. + if opt is not None and req_id is not None: + if opt in ("create", "delete", "update"): + self.pending_requests.append(req_id) + elif opt in ("confirm", "rollback"): + for rid in self.pending_requests: + if rid == req_id: + self.pending_requests.remove(rid) + break + + json_update = {'flavors': flavor_updates, 'groups': group_updates, 'hosts': host_updates, + 'host_groups': host_group_updates, 'datacenter': datacenter_update} + + if self.new: + if not self.dbh.create_resource(self.datacenter_id, + self.datacenter_url, + self.pending_requests, + json_update): + return False + else: + if not self.dbh.update_resource(self.datacenter_id, + self.datacenter_url, + self.pending_requests, + json_update): + return False + + if self.new: + self.logger.debug("new datacenter = " + self.datacenter_id) + self.logger.debug(" url = " + self.datacenter_url) + else: + self.logger.debug("updated datacenter = " + self.datacenter_id) + self.logger.debug(" url = " + self.datacenter_url) + self.logger.debug("region = " + json.dumps(json_update['datacenter'], indent=4)) + self.logger.debug("racks = " + json.dumps(json_update['host_groups'], indent=4)) + self.logger.debug("hosts = " + json.dumps(json_update['hosts'], indent=4)) + self.logger.debug("groups = " + json.dumps(json_update['groups'], indent=4)) + self.logger.debug("flavors = ") + for fk, f_info in json_update['flavors'].iteritems(): + if f_info["vCPUs"] > 0: + self.logger.debug(json.dumps(f_info, indent=4)) + + updated_valet_groups = {} + new_valet_groups = {} + deleted_valet_groups = {} + for gk, group in self.groups.iteritems(): + if group.status == "enabled": + if group.factory == "valet": + if group.new: + new_valet_groups[gk] = group.get_json_info() + elif group.updated: + updated_valet_groups[gk] = group.get_json_info() + else: + if group.factory == "valet": + deleted_valet_groups[gk] = group.get_json_info() + + for gk, g_info in new_valet_groups.iteritems(): + if not self.dbh.create_valet_group(gk, g_info): + return False + + self.logger.debug("new valet group = " + gk) + self.logger.debug("info = " + json.dumps(g_info, indent=4)) + + for gk, g_info in updated_valet_groups.iteritems(): + if not self.dbh.update_valet_group(gk, g_info): + return False + + self.logger.debug("updated valet group = " + gk) + self.logger.debug("info = " + json.dumps(g_info, indent=4)) + + for gk, g_info in deleted_valet_groups.iteritems(): + if not self.dbh.delete_valet_group(gk): + return False + + self.logger.debug("deleted valet group = " + gk) + self.logger.debug("info = " + json.dumps(g_info, indent=4)) + + return True diff --git a/engine/src/valet/engine/resource_manager/resource_handler.py b/engine/src/valet/engine/resource_manager/resource_handler.py new file mode 100644 index 0000000..38868c7 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resource_handler.py @@ -0,0 +1,299 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import json + +from valet.engine.resource_manager.resource import Resource +from valet.engine.resource_manager.resources.group_rule import GroupRule +from valet.engine.resource_manager.resources.host_group import HostGroup + + +class ResourceHandler: + """Handler for dealing with all existing datacenters and their resources.""" + + def __init__(self, _tid, _dbh, _compute, _metadata, _topology, + _config, _logger): + self.end_of_process = False + + self.dbh = _dbh + + self.compute = _compute + self.metadata = _metadata + self.topology = _topology + + self.default_cpu_allocation_ratio = _config.get("default_cpu_allocation_ratio") + self.default_ram_allocation_ratio = _config.get("default_ram_allocation_ratio") + self.default_disk_allocation_ratio = _config.get("default_disk_allocation_ratio") + self.batch_sync_interval = _config.get("batch_sync_interval") + + self.group_rules = {} + self.resource_list = [] + + self.logger = _logger + + def load_group_rules_from_db(self): + """Get all defined valet group rules from DB. + + Note that rules are applied to all datacenters. + """ + + # Init first + self.group_rules = {} + + rule_list = self.dbh.get_group_rules() + if rule_list is None: + return None + + for r in rule_list: + rule = GroupRule(r.get("id")) + + rule.status = r.get("status") + + rule.app_scope = r.get("app_scope") + rule.rule_type = r.get("type") + rule.level = r.get("level") + rule.members = json.loads(r.get("members")) + rule.desc = r.get("description") + + self.group_rules[rule.rule_id] = rule + + return "ok" + + def load_group_rule_from_db(self, _id): + """Get valet group rule from DB.""" + + # Init first + self.group_rules = {} + + r = self.dbh.get_group_rule(_id) + if r is None: + return None + elif len(r) == 0: + return "rule not found" + + rule = GroupRule(r.get("id")) + + rule.status = r.get("status") + + rule.app_scope = r.get("app_scope") + rule.rule_type = r.get("type") + rule.level = r.get("level") + rule.members = json.loads(r.get("members")) + rule.desc = r.get("description") + + self.group_rules[rule.rule_id] = rule + + return "ok" + + def create_group_rule(self, _name, _scope, _type, _level, _members, _desc): + """Create a new group rule in DB.""" + + r = self.dbh.get_group_rule(_name) + if r is None: + return None + elif len(r) > 0: + return "rule already exists" + + if not self.dbh.create_group_rule(_name, _scope, _type, _level, + _members, _desc): + return None + + return "ok" + + def get_rules(self): + """Return basic info of valet rules.""" + + rule_list = [] + + valet_group_list = self.dbh.get_valet_groups() + if valet_group_list is None: + return None + + for rk, rule in self.group_rules.iteritems(): + rule_info = self._get_rule(rule) + + for vg in valet_group_list: + if vg["rule_id"] == rk: + gk = vg.get("id") + gk_elements = gk.split(":") + dc_id = gk_elements[0] + + if dc_id not in rule_info["regions"]: + rule_info["regions"].append(dc_id) + + rule_list.append(rule_info) + + return rule_list + + def _get_rule(self, _rule): + """Return rule info.""" + + rule_info = {} + + rule_info["id"] = _rule.rule_id + rule_info["type"] = _rule.rule_type + rule_info["app_scope"] = _rule.app_scope + rule_info["level"] = _rule.level + rule_info["members"] = _rule.members + rule_info["description"] = _rule.desc + rule_info["status"] = _rule.status + rule_info["regions"] = [] + + return rule_info + + def get_placements_under_rule(self, _rule_name, _resource): + """Get server placements info under given rule in datacenter.""" + + placements = {} + + rule = self.group_rules[_rule_name] + + for gk, g in _resource.groups.iteritems(): + if g.factory == "valet": + if g.rule.rule_id == _rule_name: + placements[gk] = self._get_placements(g, _resource) + + result = {} + result["id"] = rule.rule_id + result["type"] = rule.rule_type + result["app_scope"] = rule.app_scope + result["level"] = rule.level + result["members"] = rule.members + result["description"] = rule.desc + result["status"] = rule.status + result["placements"] = placements + + return result + + def _get_placements(self, _g, _resource): + """Get placement info of servers in group.""" + + placements = {} + + for hk, server_list in _g.member_hosts.iteritems(): + for s_info in server_list: + sid = s_info.get("stack_name") + ":" + s_info.get("name") + placements[sid] = {} + placements[sid]["region"] = _resource.datacenter_id + + if hk in _resource.hosts.keys(): + host = _resource.hosts[hk] + + placements[sid]["host"] = host.name + + hg = host.host_group + if isinstance(hg, HostGroup) and hg.host_type == "rack": + placements[sid]["rack"] = hg.name + else: + placements[sid]["rack"] = "na" + + az = host.get_availability_zone() + az_name_elements = az.name.split(':', 1) + if len(az_name_elements) > 1: + az_name = az_name_elements[1] + else: + az_name = az.name + placements[sid]["availability-zone"] = az_name + + elif hk in _resource.host_groups.keys(): + hg = _resource.host_groups[hk] + + if hg.host_type == "rack": + placements[sid]["rack"] = hg.name + + for hhk, host in hg.child_resources.iteritems(): + if host.has_server(s_info): + placements[sid]["host"] = host.name + + az = host.get_availability_zone() + az_name_elements = az.name.split(':', 1) + if len(az_name_elements) > 1: + az_name = az_name_elements[1] + else: + az_name = az.name + placements[sid]["availability-zone"] = az_name + + break + else: + # TODO(Gueyoung): Look for az, rack and host + placements[sid]["availability-zone"] = "na" + placements[sid]["rack"] = "na" + placements[sid]["host"] = "na" + + else: + placements[sid]["availability-zone"] = "na" + placements[sid]["rack"] = "na" + placements[sid]["host"] = "na" + + return placements + + def load_resource(self, _datacenter): + """Create a resource for placement decisions + + in a given target datacenter. + """ + + # Init first + del self.resource_list[:] + + resource = Resource(_datacenter, self.dbh, + self.compute, self.metadata, self.topology, + self.logger) + + resource.set_config(self.default_cpu_allocation_ratio, + self.default_ram_allocation_ratio, + self.default_disk_allocation_ratio) + + resource.set_group_rules(self.group_rules) + + status = resource.load_resource_from_db() + if status is None: + return False + elif status != "ok": + self.logger.warning(status) + resource.new = True + + self.resource_list.append(resource) + + return True + + def load_resource_with_rule(self, _datacenter): + """Create and return a resource with valet group rule.""" + + # Init first + del self.resource_list[:] + + resource = Resource(_datacenter, self.dbh, + self.compute, self.metadata, self.topology, + self.logger) + + resource.set_config(self.default_cpu_allocation_ratio, + self.default_ram_allocation_ratio, + self.default_disk_allocation_ratio) + + resource.set_group_rules(self.group_rules) + + status = resource.load_resource_from_db() + if status is None: + return None + elif status != "ok": + return status + + self.resource_list.append(resource) + + return "ok" diff --git a/engine/src/valet/engine/resource_manager/topology_manager.py b/engine/src/valet/engine/resource_manager/topology_manager.py new file mode 100644 index 0000000..f8422d3 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/topology_manager.py @@ -0,0 +1,237 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +from valet.engine.resource_manager.resources.datacenter import Datacenter +from valet.engine.resource_manager.resources.host_group import HostGroup + + +class TopologyManager(object): + """Manager to maintain the layout of datacenter.""" + + def __init__(self, _source, _logger): + self.source = _source + + self.datacenter = None + self.host_groups = {} + self.hosts = {} + + self.logger = _logger + + def get_topology(self, _resource): + """Set datacenter layout into resource.""" + + self.logger.info("set datacenter layout...") + + # Init first + self.datacenter = Datacenter(_resource.datacenter_id) + self.host_groups.clear() + self.hosts.clear() + + if self.source.get_topology(self.datacenter, self.host_groups, self.hosts, + _resource.hosts) != "ok": + return False + + self._check_updated(_resource) + + return True + + def _check_updated(self, _resource): + """Check if the layout is changed.""" + + if _resource.datacenter is None: + _resource.datacenter = Datacenter(_resource.datacenter_id) + _resource.datacenter.updated = True + + self.logger.info("new datacenter (" + _resource.datacenter_id + ") added") + + for hgk in self.host_groups.keys(): + if hgk not in _resource.host_groups.keys(): + new_host_group = HostGroup(hgk) + new_host_group.host_type = self.host_groups[hgk].host_type + + _resource.host_groups[new_host_group.name] = new_host_group + _resource.mark_host_group_updated(hgk) + + self.logger.info("new host_group (" + hgk + ") added") + + for rhgk in _resource.host_groups.keys(): + if rhgk not in self.host_groups.keys(): + host_group = _resource.host_groups[rhgk] + host_group.status = "disabled" + host_group.mark_host_group_updated(rhgk) + + self.logger.info("host_group (" + rhgk + ") disabled") + + # TODO(Gueyoung): what if host exists in topology, + # but does not in resource (DB or platform)? + + for rhk in _resource.hosts.keys(): + if not _resource.hosts[rhk].is_available(): + continue + + if rhk not in self.hosts.keys(): + _resource.hosts[rhk].status = "disabled" + _resource.mark_host_updated(rhk) + + self.logger.info("host (" + rhk + ") removed from topology") + + if self._is_datacenter_updated(_resource): + _resource.datacenter.updated = True + + for hgk in self.host_groups.keys(): + hg = self.host_groups[hgk] + + if self._is_host_group_updated(hg, _resource): + _resource.mark_host_group_updated(hgk) + + for hk in self.hosts.keys(): + if hk in _resource.hosts.keys(): + if not _resource.hosts[hk].is_available(): + continue + + host = self.hosts[hk] + + if self._is_host_updated(host, _resource): + _resource.mark_host_updated(hk) + + # TODO(Gueyoung): Hierachical failure propagation + + def _is_datacenter_updated(self, _resource): + """Check if datacenter's resources are changed.""" + + updated = False + + _rdatacenter = _resource.datacenter + + for rk in self.datacenter.resources.keys(): + + h = None + if rk in _resource.host_groups.keys(): + h = _resource.host_groups[rk] + elif rk in _resource.hosts.keys(): + h = _resource.hosts[rk] + + if h is not None and h.is_available(): + if rk not in _rdatacenter.resources.keys() or h.updated: + _rdatacenter.resources[rk] = h + updated = True + + self.logger.info("datacenter updated (new resource)") + + for rk in _rdatacenter.resources.keys(): + + h = None + if rk in _resource.host_groups.keys(): + h = _resource.host_groups[rk] + elif rk in _resource.hosts.keys(): + h = _resource.hosts[rk] + + if h is None or \ + not h.is_available() or \ + rk not in self.datacenter.resources.keys(): + del _rdatacenter.resources[rk] + updated = True + + self.logger.info("datacenter updated (resource removed)") + + return updated + + def _is_host_group_updated(self, _hg, _resource): + """Check if host_group's parent or children are changed.""" + + updated = False + + _rhg = _resource.host_groups[_hg.name] + + if _hg.host_type != _rhg.host_type: + _rhg.host_type = _hg.host_type + updated = True + self.logger.info("host_group (" + _rhg.name + ") updated (hosting type)") + + if _rhg.parent_resource is None or \ + _rhg.parent_resource.name != _hg.parent_resource.name: + if _hg.parent_resource.name in _resource.host_groups.keys(): + hg = _resource.host_groups[_hg.parent_resource.name] + if hg.is_available(): + _rhg.parent_resource = hg + updated = True + elif _hg.parent_resource.name == _resource.datacenter.name: + _rhg.parent_resource = _resource.datacenter + updated = True + + if updated: + self.logger.info("host_group (" + _rhg.name + ") updated (parent host_group)") + + for rk in _hg.child_resources.keys(): + + h = None + if rk in _resource.hosts.keys(): + h = _resource.hosts[rk] + elif rk in _resource.host_groups.keys(): + h = _resource.host_groups[rk] + + if h is not None and h.is_available(): + if rk not in _rhg.child_resources.keys() or h.updated: + _rhg.child_resources[rk] = h + updated = True + + self.logger.info("host_group (" + _rhg.name + ") updated (new child host)") + + for rk in _rhg.child_resources.keys(): + + h = None + if rk in _resource.hosts.keys(): + h = _resource.hosts[rk] + elif rk in _resource.host_groups.keys(): + h = _resource.host_groups[rk] + + if h is None or \ + not h.is_available() or \ + rk not in _hg.child_resources.keys(): + del _rhg.child_resources[rk] + updated = True + + self.logger.info("host_group (" + _rhg.name + ") updated (child host removed)") + + return updated + + def _is_host_updated(self, _host, _resource): + """Check if host's parent (e.g., rack) is changed.""" + + updated = False + + _rhost = _resource.hosts[_host.name] + + if _rhost.host_group is None or \ + _rhost.host_group.name != _host.host_group.name: + if _host.host_group.name in _resource.host_groups.keys(): + rhost_group = _resource.host_groups[_host.host_group.name] + if rhost_group.is_available(): + _rhost.host_group = rhost_group + updated = True + elif _host.host_group.name == _resource.datacenter.name: + _rhost.host_group = _resource.datacenter + updated = True + + if updated: + self.logger.info("host (" + _rhost.name + ") updated (host_group)") + + return False |