diff options
Diffstat (limited to 'engine')
-rw-r--r-- | engine/src/valet/engine/search/__init__.py | 18 | ||||
-rw-r--r-- | engine/src/valet/engine/search/avail_resources.py | 76 | ||||
-rw-r--r-- | engine/src/valet/engine/search/constraint_solver.py | 117 | ||||
-rw-r--r-- | engine/src/valet/engine/search/optimizer.py | 494 | ||||
-rw-r--r-- | engine/src/valet/engine/search/resource.py | 264 | ||||
-rw-r--r-- | engine/src/valet/engine/search/search.py | 708 | ||||
-rw-r--r-- | engine/src/valet/engine/search/search_helper.py | 43 |
7 files changed, 1720 insertions, 0 deletions
diff --git a/engine/src/valet/engine/search/__init__.py b/engine/src/valet/engine/search/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/search/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/search/avail_resources.py b/engine/src/valet/engine/search/avail_resources.py new file mode 100644 index 0000000..c4484b8 --- /dev/null +++ b/engine/src/valet/engine/search/avail_resources.py @@ -0,0 +1,76 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +from valet.engine.app_manager.group import LEVEL + + +class AvailResources(object): + """Container to keep hosting resources and candidate resources + + of each level (host or rack) for search. + """ + + def __init__(self, _level): + self.level = _level + self.avail_hosts = {} + self.candidates = {} + + def set_next_level(self): + """Get the next level to search.""" + + current_level_index = LEVEL.index(self.level) + next_level_index = current_level_index - 1 + + if next_level_index < 0: + self.level = LEVEL[0] + else: + self.level = LEVEL[next_level_index] + + def set_next_avail_hosts(self, _avail_hosts, _resource_of_level): + """Set the next level of available hosting resources.""" + + for hk, h in _avail_hosts.iteritems(): + if self.level == "rack": + if h.rack_name == _resource_of_level: + self.avail_hosts[hk] = h + elif self.level == "host": + if h.host_name == _resource_of_level: + self.avail_hosts[hk] = h + + def set_candidates(self): + if self.level == "rack": + for _, h in self.avail_hosts.iteritems(): + self.candidates[h.rack_name] = h + elif self.level == "host": + self.candidates = self.avail_hosts + + def get_candidate(self, _resource): + candidate = None + + if self.level == "rack": + for _, h in self.avail_hosts.iteritems(): + if h.rack_name == _resource.rack_name: + candidate = h + elif self.level == "host": + if _resource.host_name in self.avail_hosts.keys(): + candidate = self.avail_hosts[_resource.host_name] + + return candidate diff --git a/engine/src/valet/engine/search/constraint_solver.py b/engine/src/valet/engine/search/constraint_solver.py new file mode 100644 index 0000000..2593db8 --- /dev/null +++ b/engine/src/valet/engine/search/constraint_solver.py @@ -0,0 +1,117 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.search.filters.affinity_filter import AffinityFilter +from valet.engine.search.filters.aggregate_instance_filter import AggregateInstanceExtraSpecsFilter +from valet.engine.search.filters.az_filter import AvailabilityZoneFilter +from valet.engine.search.filters.cpu_filter import CPUFilter +from valet.engine.search.filters.disk_filter import DiskFilter +from valet.engine.search.filters.diversity_filter import DiversityFilter +from valet.engine.search.filters.dynamic_aggregate_filter import DynamicAggregateFilter +from valet.engine.search.filters.exclusivity_filter import ExclusivityFilter +from valet.engine.search.filters.mem_filter import MemFilter +from valet.engine.search.filters.no_exclusivity_filter import NoExclusivityFilter +from valet.engine.search.filters.numa_filter import NUMAFilter +from valet.engine.search.filters.quorum_diversity_filter import QuorumDiversityFilter + + +class ConstraintSolver(object): + """Constraint solver to filter out candidate hosts.""" + + def __init__(self, _logger): + """Define fileters and application order.""" + + self.logger = _logger + + self.filter_list = [] + + # TODO(Gueyoung): add soft-affinity and soft-diversity filters + + # TODO(Gueyoung): the order of applying filters? + + # Apply platform filters first + self.filter_list.append(AvailabilityZoneFilter()) + self.filter_list.append(AggregateInstanceExtraSpecsFilter()) + self.filter_list.append(CPUFilter()) + self.filter_list.append(MemFilter()) + self.filter_list.append(DiskFilter()) + self.filter_list.append(NUMAFilter()) + + # Apply Valet filters next + self.filter_list.append(DiversityFilter()) + self.filter_list.append(QuorumDiversityFilter()) + self.filter_list.append(ExclusivityFilter()) + self.filter_list.append(NoExclusivityFilter()) + self.filter_list.append(AffinityFilter()) + + # Apply dynamic aggregate filter to determine the host's aggregate + # in a lazy way. + self.filter_list.append(DynamicAggregateFilter()) + + self.status = "ok" + + def get_candidate_list(self, _n, _avail_resources, _avail_hosts, _avail_groups): + """Filter candidate hosts using a list of filters.""" + + level = _avail_resources.level + + candidate_list = [] + + # This is the resource which name is 'any' + ghost_candidate = None + + for _, r in _avail_resources.candidates.iteritems(): + candidate_list.append(r) + + if r.get_resource_name(level) == "any": + ghost_candidate = r + + if len(candidate_list) == 0: + self.status = "no candidate for node = " + _n.vid + self.logger.warning(self.status) + return [] + + for f in self.filter_list: + f.init_condition() + + if not f.check_pre_condition(level, _n, _avail_hosts, _avail_groups): + if f.status is not None: + self.status = f.status + self.logger.error(self.status) + return [] + else: + self.logger.debug("skip " + f.name + " constraint for node = " + _n.vid) + + continue + + candidate_list = f.filter_candidates(level, _n, candidate_list) + + if ghost_candidate and ghost_candidate not in candidate_list: + candidate_list.append(ghost_candidate) + + if len(candidate_list) == 0: + self.status = "violate " + level + " " + f.name + " constraint for node = " + _n.vid + if f.status is not None: + self.status += " detail: " + f.status + self.logger.debug(self.status) + return [] + elif len(candidate_list) > 0: + str_num = str(len(candidate_list)) + self.logger.debug("pass " + f.name + " constraint for node = " + _n.vid + " with " + str_num) + + return candidate_list diff --git a/engine/src/valet/engine/search/optimizer.py b/engine/src/valet/engine/search/optimizer.py new file mode 100644 index 0000000..de45cee --- /dev/null +++ b/engine/src/valet/engine/search/optimizer.py @@ -0,0 +1,494 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.app_manager.group import Group +from valet.engine.app_manager.server import Server +from valet.engine.search.search import Search + + +class Optimizer(object): + """Optimizer to compute the optimal placements.""" + + def __init__(self, _logger): + self.logger = _logger + + self.search = Search(self.logger) + + def place(self, _app): + """Scheduling placements given app.""" + + _app.set_weight() + _app.set_optimization_priority() + + if self.search.place(_app) is True: + if _app.status == "ok": + self._set_app(_app, "create") + + self._set_resource(_app) + if _app.status != "ok": + return + else: + if _app.status == "ok": + _app.status = "failed" + + self._rollback_placements(_app) + + def update(self, _app): + """Update state of current placements.""" + + if _app.state == "delete": + self._update_app_for_delete(_app) + + self._update_resource(_app) + if _app.status != "ok": + return + else: + _app.status = "unknown state while updating app" + + def confirm(self, _app): + """Confirm prior request.""" + + if _app.state == "created": + self._update_app(_app) + if _app.status != "ok": + return + + self._update_resource(_app) + if _app.status != "ok": + return + elif _app.state == "deleted": + self._remove_resource(_app) + if _app.status != "ok": + return + else: + _app.status = "unknown state while updating app" + return + + def rollback(self, _app): + """Rollback prior decision.""" + + if _app.state == "created": + self._update_app(_app) + if _app.status != "ok": + return + + self._update_resource(_app) + if _app.status != "ok": + return + elif _app.state == "deleted": + self._remove_resource(_app) + if _app.status != "ok": + return + else: + _app.status = "unknown state while updating app" + + def _set_app(self, _app, _state): + """Update with assigned hosts.""" + + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + v.host = p.host_name + if p.rack_name != "any": + v.host_group = p.rack_name + + host = self.search.avail_hosts[p.host_name] + + s_info = {} + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + s_info["uuid"] = "none" + s_info["name"] = v.name + + v.numa = host.NUMA.pop_cell_of_server(s_info) + + v.state = _state + + # Put back servers from groups. + _app.reset_servers() + + def _update_app(self, _app): + """Update state of servers.""" + + for sk, s in _app.servers.iteritems(): + if s["host"] == "none": + continue + + s["state"] = _app.state + + host_name = s.get("host") + + host = None + if host_name in _app.resource.hosts.keys(): + host = _app.resource.hosts[host_name] + + s_info = {} + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + s_info["uuid"] = "none" + s_info["name"] = s.get("name") + + # Check if the prior placements changed. + if host is None or \ + not host.is_available() or \ + not host.has_server(s_info): + _app.status = "server (" + sk + ") placement has been changed" + self.logger.error(_app.status) + + def _update_app_for_delete(self, _app): + """Check the prior placements and update state + + And update placements if they have been changed. + """ + + for sk, s in _app.servers.iteritems(): + if s["host"] == "none": + continue + + s["state"] = _app.state + + host_name = s.get("host") + + host = None + if host_name in _app.resource.hosts.keys(): + host = _app.resource.hosts[host_name] + + s_info = {} + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + s_info["uuid"] = "none" + s_info["name"] = s.get("name") + + # Check if the prior placements changed. + if host is None or \ + not host.is_available() or \ + not host.has_server(s_info): + self.logger.warning("server (" + sk + ") placement has been changed") + + new_host = _app.resource.get_host_of_server(s_info) + + if new_host is not None: + s["host"] = new_host.name + else: + s["host"] = "none" + self.logger.warning("server (" + sk + ") not exists") + + def _set_resource(self, _app): + """Update resource status based on new placements.""" + + # If host's type (i.e., host-aggregate) is not determined before, + # Convert/set host's type to one as specified in VM. + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + # The host object p was deep copied, so use original object. + rh = self.search.avail_hosts[p.host_name] + + if rh.old_candidate_host_types is not None and len(rh.old_candidate_host_types) > 0: + flavor_type_list = v.get_flavor_types() + ha = self.search.avail_groups[flavor_type_list[0]] + + self._convert_host(rh, + ha.name, + rh.get_host_type(ha, rh.old_candidate_host_types), + _app.resource) + + placements = {} + + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + s_info = {} + + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + + s_info["uuid"] = "none" + s_info["orch_id"] = v.orch_id + s_info["name"] = v.name + + s_info["flavor_id"] = v.flavor + s_info["vcpus"] = v.vCPUs + s_info["mem"] = v.mem + s_info["disk"] = v.local_volume_size + s_info["numa"] = v.numa + + s_info["image_id"] = v.image + s_info["tenant_id"] = _app.tenant_id + + s_info["state"] = v.state + s_info["status"] = "valid" + + placements[v.vid] = {} + placements[v.vid]["new_host"] = p.host_name + placements[v.vid]["info"] = s_info + + # Update compute host with new servers + if not _app.resource.update_server_placements(change_of_placements=placements): + _app.status = "fail while updating server placements" + return + + groups = {} + + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + rh = self.search.avail_hosts[p.host_name] + + for gk, g in rh.host_memberships.iteritems(): + if g.factory in ("valet", "server-group"): + if g.level == "host": + _app.resource.add_group(gk, + g.group_type, + g.level, + g.factory, + rh.host_name) + + if rh.rack_name != "any": + for gk, g in rh.rack_memberships.iteritems(): + if g.factory in ("valet", "server-group"): + if g.level == "rack": + _app.resource.add_group(gk, + g.group_type, + g.level, + g.factory, + rh.rack_name) + + s_info = placements[v.vid].get("info") + + self._collect_groups_of_server(v, s_info, groups) + + # Update groups with new servers + _app.resource.update_server_grouping(change_of_placements=placements, + new_groups=groups) + + _app.resource.update_resource() + + def _convert_host(self, _rhost, _ha_name, _host_type, _resource): + """Convert host's type into the specific type as given.""" + + host = _resource.hosts[_rhost.host_name] + + if host.candidate_host_types is None or len(host.candidate_host_types) == 0: + return + + host.vCPUs = _host_type["vCPUs"] + host.original_vCPUs = _host_type["original_vCPUs"] + host.avail_vCPUs = _host_type["avail_vCPUs"] + host.mem_cap = _host_type["mem"] + host.original_mem_cap = _host_type["original_mem"] + host.avail_mem_cap = _host_type["avail_mem"] + host.local_disk_cap = _host_type["local_disk"] + host.original_local_disk_cap = _host_type["original_local_disk"] + host.avail_local_disk_cap = _host_type["avail_local_disk"] + host.vCPUs_used = _host_type["vCPUs_used"] + host.free_mem_mb = _host_type["free_mem_mb"] + host.free_disk_gb = _host_type["free_disk_gb"] + host.disk_available_least = _host_type["disk_available_least"] + + host.NUMA = _rhost.NUMA + + ha = _resource.groups[_ha_name] + host.memberships[ha.name] = ha + ha.member_hosts[host.name] = [] + ha.updated = True + + _resource.mark_host_updated(host.name) + + _resource.update_resource() + + if host.candidate_host_types is not None: + host.candidate_host_types.clear() + + def _rollback_placements(self, _app): + """Remove placements when they fail. + + Remove placements from NUMA cells of resource object. + """ + + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + s_info = {} + + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + + s_info["uuid"] = "none" + s_info["orch_id"] = v.orch_id + s_info["name"] = v.name + + s_info["flavor_id"] = v.flavor + s_info["vcpus"] = v.vCPUs + s_info["mem"] = v.mem + s_info["disk"] = v.local_volume_size + s_info["numa"] = v.numa + + s_info["image_id"] = v.image + s_info["tenant_id"] = _app.tenant_id + + s_info["state"] = v.state + s_info["status"] = "valid" + + host = _app.resource.hosts[p.host_name] + host.NUMA.rollback_server_resources(s_info) + + def _collect_groups_of_server(self, _v, _s_info, _groups): + """Collect all groups of the server and its parent (affinity).""" + + # TODO(Gueyoung): track host-aggregates and availability-zone? + + for gk in _v.exclusivity_groups.keys(): + if gk not in _groups.keys(): + _groups[gk] = [] + _groups[gk].append(_s_info) + + for gk in _v.diversity_groups.keys(): + if gk not in _groups.keys(): + _groups[gk] = [] + _groups[gk].append(_s_info) + + for gk in _v.quorum_diversity_groups.keys(): + if gk not in _groups.keys(): + _groups[gk] = [] + _groups[gk].append(_s_info) + + if isinstance(_v, Group): + if _v.vid not in _groups.keys(): + _groups[_v.vid] = [] + _groups[_v.vid].append(_s_info) + + # Recursively check server or its affinity group. + if _v.surgroup is not None: + self._collect_groups_of_server(_v.surgroup, _s_info, _groups) + + def _remove_resource(self, _app): + """Remove servers from resources. + + Resources: NUMA, host, host_group, datacenter, + valet groups and server-groups. + """ + + placements = {} + + for sk, s in _app.servers.iteritems(): + if s["host"] == "none": + continue + + s_info = {} + + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + + s_info["uuid"] = "none" + s_info["name"] = s.get("name") + + s_info["flavor_id"] = s.get("flavor") + s_info["vcpus"] = s.get("cpus") + s_info["mem"] = s.get("mem") + s_info["disk"] = s.get("local_volume") + s_info["numa"] = s.get("numa") + + s_info["image_id"] = s.get("image") + s_info["tenant_id"] = _app.tenant_id + + s_info["state"] = "deleted" + + if s_info["stack_id"] != "none": + sid = s_info["stack_id"] + ":" + s_info["name"] + else: + sid = s_info["stack_name"] + ":" + s_info["name"] + + placements[sid] = {} + placements[sid]["old_host"] = s.get("host") + placements[sid]["info"] = s_info + + if not _app.resource.update_server_placements(change_of_placements=placements): + _app.status = "fail while updating server placements" + return + + _app.resource.update_server_grouping(change_of_placements=placements, + new_groups={}) + + _app.resource.update_resource() + + def _update_resource(self, _app): + """Update state of servers in resources. + + Resources: NUMA, host, host_group, datacenter, + valet groups and server-groups. + """ + + placements = {} + + for sk, s in _app.servers.iteritems(): + if s["host"] == "none": + continue + + s_info = {} + + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + + s_info["uuid"] = "none" + s_info["name"] = s.get("name") + + s_info["flavor_id"] = "none" + s_info["vcpus"] = -1 + s_info["mem"] = -1 + s_info["disk"] = -1 + s_info["numa"] = "none" + + s_info["image_id"] = "none" + + s_info["state"] = s.get("state") + s_info["status"] = "none" + + if s_info["stack_id"] != "none": + sid = s_info["stack_id"] + ":" + s_info["name"] + else: + sid = s_info["stack_name"] + ":" + s_info["name"] + + placements[sid] = {} + placements[sid]["host"] = s.get("host") + placements[sid]["info"] = s_info + + if not _app.resource.update_server_placements(change_of_placements=placements): + _app.status = "fail while updating server placements" + return + + _app.resource.update_server_grouping(change_of_placements=placements, + new_groups={}) + + _app.resource.update_resource() diff --git a/engine/src/valet/engine/search/resource.py b/engine/src/valet/engine/search/resource.py new file mode 100644 index 0000000..18c8ca7 --- /dev/null +++ b/engine/src/valet/engine/search/resource.py @@ -0,0 +1,264 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.resource_manager.resources.numa import NUMA + + +class GroupResource(object): + """Container for all resource group includes + + affinity, diversity, quorum-diversity, exclusivity, host-aggregate and availability. + """ + + def __init__(self): + self.name = None + + self.group_type = "aggr" + self.factory = "nova" + self.level = "host" + + self.metadata = {} + + self.original_num_of_placed_servers = 0 + self.num_of_placed_servers = 0 + + # key = host (host or rack), value = num_of_placed_servers + self.num_of_placed_servers_of_host = {} + + +class HostResource(object): + """Container for hosting resource (host, rack).""" + + def __init__(self): + # Host info + self.host_name = None + + self.host_memberships = {} # all mapped groups to host + + self.host_avail_vCPUs = 0 # remaining vCPUs after overcommit + self.host_avail_mem = 0 # remaining mem cap after + self.host_avail_local_disk = 0 # remaining local disk cap after overcommit + + self.NUMA = None + + self.host_num_of_placed_servers = 0 # the number of vms currently placed in this host + + # If the host type is not determined yet, + # provide possible host types. + self.candidate_host_types = {} + self.old_candidate_host_types = {} # For rollback + + # To track newly added host types. + self.new_host_aggregate_list = [] + + # Rack info + self.rack_name = None # where this host is located + + self.rack_memberships = {} + + self.rack_avail_vCPUs = 0 + self.rack_avail_mem = 0 + self.rack_avail_local_disk = 0 + + self.rack_num_of_placed_servers = 0 + + # To track newly added host types. + self.new_rack_aggregate_list = [] + + self.level = None # level of placement + + self.sort_base = 0 # order to place + + def get_host_type(self, _ha, _host_types): + """Take host-aggregate group and + return default host type of the host-aggregate. + """ + + host_type = None + + if _host_types is None: + return host_type + + host_type_list = _host_types[_ha.name] + for ht in host_type_list: + if "default" in ht.keys(): + host_type = ht + break + + return host_type + + def adjust_avail_resources(self, _ha): + """Take host-aggregate group and + add it to host/rack memberships and + adjust the amount of available resources based on + the corresponding host type. + """ + + if _ha.name not in self.host_memberships.keys(): + self.host_memberships[_ha.name] = _ha + self.new_host_aggregate_list.append(_ha.name) + if _ha.name not in self.rack_memberships.keys(): + self.rack_memberships[_ha.name] = _ha + self.new_rack_aggregate_list.append(_ha.name) + + host_type = self.get_host_type(_ha, self.candidate_host_types) + + self.host_avail_vCPUs = host_type["avail_vCPUs"] + self.host_avail_mem = host_type["avail_mem"] + self.host_avail_local_disk = host_type["avail_local_disk"] + + self.NUMA = NUMA(numa=host_type["NUMA"]) + + if self.candidate_host_types is not None: + for htk, htl in self.candidate_host_types.iteritems(): + if htk == "mockup": + self.rack_avail_vCPUs -= htl[0]["avail_vCPUs"] + self.rack_avail_mem -= htl[0]["avail_mem"] + self.rack_avail_local_disk -= htl[0]["avail_local_disk"] + + self.rack_avail_vCPUs += self.host_avail_vCPUs + self.rack_avail_mem += self.host_avail_mem + self.rack_avail_local_disk += self.host_avail_local_disk + + break + + def adjust_avail_rack_resources(self, _ha, _cpus, _mem, _disk): + """Take host-aggregate group and the amount of available resources + add the group into rack membership and + adjust the amount of available rack resources. + """ + + if _ha.name not in self.rack_memberships.keys(): + self.rack_memberships[_ha.name] = _ha + self.new_rack_aggregate_list.append(_ha.name) + + self.rack_avail_vCPUs = _cpus + self.rack_avail_mem = _mem + self.rack_avail_local_disk = _disk + + def rollback_avail_resources(self, _ha): + if _ha.name in self.new_host_aggregate_list: + del self.host_memberships[_ha.name] + self.new_host_aggregate_list.remove(_ha.name) + if _ha.name in self.new_rack_aggregate_list: + del self.rack_memberships[_ha.name] + self.new_rack_aggregate_list.remove(_ha.name) + + host_type = self.get_host_type(_ha, self.old_candidate_host_types) + + if self.old_candidate_host_types is not None: + for htk, htl in self.old_candidate_host_types.iteritems(): + if htk == "mockup": + self.host_avail_vCPUs = htl[0]["avail_vCPUs"] + self.host_avail_mem = htl[0]["avail_mem"] + self.host_avail_local_disk = htl[0]["avail_local_disk"] + + self.NUMA = NUMA(numa=htl[0]["NUMA"]) + + self.rack_avail_vCPUs -= host_type["avail_vCPUs"] + self.rack_avail_mem -= host_type["avail_mem"] + self.rack_avail_local_disk -= host_type["avail_local_disk"] + + self.rack_avail_vCPUs += self.host_avail_vCPUs + self.rack_avail_mem += self.host_avail_mem + self.rack_avail_local_disk += self.host_avail_local_disk + + break + + def rollback_avail_rack_resources(self, _ha, _cpus, _mem, _disk): + if _ha.name in self.new_rack_aggregate_list: + del self.rack_memberships[_ha.name] + self.new_rack_aggregate_list.remove(_ha.name) + + self.rack_avail_vCPUs = _cpus + self.rack_avail_mem = _mem + self.rack_avail_local_disk = _disk + + def get_resource_name(self, _level): + name = "unknown" + + if _level == "rack": + name = self.rack_name + elif _level == "host": + name = self.host_name + + return name + + def get_vcpus(self, _level): + avail_vcpus = 0 + + if _level == "rack": + avail_vcpus = self.rack_avail_vCPUs + elif _level == "host": + avail_vcpus = self.host_avail_vCPUs + + return avail_vcpus + + def get_mem(self, _level): + avail_mem = 0 + + if _level == "rack": + avail_mem = self.rack_avail_mem + elif _level == "host": + avail_mem = self.host_avail_mem + + return avail_mem + + def get_local_disk(self, _level): + avail_local_disk = 0 + + if _level == "rack": + avail_local_disk = self.rack_avail_local_disk + elif _level == "host": + avail_local_disk = self.host_avail_local_disk + + return avail_local_disk + + def get_memberships(self, _level): + memberships = None + + if _level == "rack": + memberships = self.rack_memberships + elif _level == "host": + memberships = self.host_memberships + + return memberships + + def get_all_memberships(self, _level): + memberships = {} + + if _level == "rack": + for mk, m in self.rack_memberships.iteritems(): + memberships[mk] = m + for mk, m in self.host_memberships.iteritems(): + memberships[mk] = m + elif _level == "host": + for mk, m in self.host_memberships.iteritems(): + memberships[mk] = m + + return memberships + + def get_num_of_placed_servers(self, _level): + num_of_servers = 0 + + if _level == "rack": + num_of_servers = self.rack_num_of_placed_servers + elif _level == "host": + num_of_servers = self.host_num_of_placed_servers + + return num_of_servers diff --git a/engine/src/valet/engine/search/search.py b/engine/src/valet/engine/search/search.py new file mode 100644 index 0000000..40added --- /dev/null +++ b/engine/src/valet/engine/search/search.py @@ -0,0 +1,708 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import copy +import operator + +from valet.engine.app_manager.server import Server +from valet.engine.resource_manager.resources.datacenter import Datacenter +from valet.engine.search.avail_resources import AvailResources +from valet.engine.search.constraint_solver import ConstraintSolver +from valet.engine.search.resource import GroupResource, HostResource +from valet.engine.search.search_helper import * + + +class Search(object): + """Bin-packing approach in the hierachical datacenter layout.""" + + def __init__(self, _logger): + self.logger = _logger + + # Search inputs + self.app = None + self.resource = None + + # Snapshot of current resource status + self.avail_hosts = {} + self.avail_groups = {} + + # Search results + self.node_placements = {} + self.prior_placements = {} # TODO + self.num_of_hosts = 0 + + # Optimization criteria + self.CPU_weight = -1 + self.mem_weight = -1 + self.local_disk_weight = -1 + + self.constraint_solver = None + + def _init_search(self, _app): + """Init the search information and the output results.""" + + self.app = _app + self.resource = _app.resource + + self.avail_hosts.clear() + self.avail_groups.clear() + + self.node_placements.clear() + self.prior_placements.clear() # TODO + self.num_of_hosts = 0 + + self.CPU_weight = -1 + self.mem_weight = -1 + self.local_disk_weight = -1 + + self.constraint_solver = ConstraintSolver(self.logger) + + self._create_avail_groups() + self._create_avail_hosts() + + # TODO + # if len(self.app.old_vm_map) > 0: + # self._adjust_resources() + + self._set_resource_weights() + + def _create_avail_groups(self): + """Collect all available resource groups. + + Group type is affinity, diversity, quorum-diversity, exclusivity, + availability-zone, host-aggregate, server-group. + """ + + for gk, g in self.resource.groups.iteritems(): + if g.status != "enabled": + self.logger.debug("group (" + g.name + ") disabled") + continue + + gr = GroupResource() + gr.name = gk + + gr.group_type = g.group_type + gr.factory = g.factory + + if g.level is not None: + gr.level = g.level + else: + gr.level = "host" + + for mk, mv in g.metadata.iteritems(): + gr.metadata[mk] = mv + + gr.original_num_of_placed_servers = len(g.server_list) + gr.num_of_placed_servers = len(g.server_list) + + for hk in g.member_hosts.keys(): + gr.num_of_placed_servers_of_host[hk] = len(g.member_hosts[hk]) + + self.avail_groups[gk] = gr + + def _create_avail_hosts(self): + """Create all available hosts.""" + + for hk, host in self.resource.hosts.iteritems(): + if not host.is_available(): + self.logger.warning("host (" + host.name + ") not available at this time") + continue + + hr = HostResource() + hr.host_name = hk + + for mk in host.memberships.keys(): + if mk in self.avail_groups.keys(): + hr.host_memberships[mk] = self.avail_groups[mk] + + # Not used by Valet, only capacity planning + try: + for htk, ht in host.candidate_host_types.iteritems(): + hr.candidate_host_types[htk] = copy.deepcopy(ht) + except AttributeError: + hr.candidate_host_types = {} + + hr.host_avail_vCPUs = host.avail_vCPUs + hr.host_avail_mem = host.avail_mem_cap + hr.host_avail_local_disk = host.avail_local_disk_cap + + hr.NUMA = host.NUMA # NOTE: refer to host's NUMA, instead of deepcopy. + + hr.host_num_of_placed_servers = len(host.server_list) + + rack = host.host_group + if isinstance(rack, Datacenter): + hr.rack_name = "any" + else: + if not rack.is_available(): + continue + + hr.rack_name = rack.name + + for mk in rack.memberships.keys(): + if mk in self.avail_groups.keys(): + hr.rack_memberships[mk] = self.avail_groups[mk] + + hr.rack_avail_vCPUs = rack.avail_vCPUs + hr.rack_avail_mem = rack.avail_mem_cap + hr.rack_avail_local_disk = rack.avail_local_disk_cap + + hr.rack_num_of_placed_servers = len(rack.server_list) + + if hr.host_num_of_placed_servers > 0: + self.num_of_hosts += 1 + + self.avail_hosts[hk] = hr + + def _set_resource_weights(self): + """Compute weight of each resource type. + + As larger weight, as more important resource to be considered. + """ + + denominator = 0.0 + for _, w in self.app.optimization_priority: + denominator += w + + for t, w in self.app.optimization_priority: + if t == "cpu": + self.CPU_weight = float(w / denominator) + elif t == "mem": + self.mem_weight = float(w / denominator) + elif t == "lvol": + self.local_disk_weight = float(w / denominator) + + def place(self, _app): + """Determine placements of new app creation.""" + + self._init_search(_app) + + self.logger.info("search......") + + open_node_list = self._open_list(self.app.servers, self.app.groups) + + avail_resources = AvailResources(LEVEL[len(LEVEL) - 1]) + avail_resources.avail_hosts = self.avail_hosts + avail_resources.set_next_level() # NOTE(Gueyoung): skip 'cluster' level + + return self._run_greedy(open_node_list, avail_resources, "new") + + # TODO: for update opt. + def re_place(self, _app_topology): + pass + + def _re_place(self): + pass + + def _open_list(self, _servers, _groups): + """Extract all servers and groups of each level (rack, host).""" + + open_node_list = [] + + for _, s in _servers.iteritems(): + self._set_node_weight(s) + open_node_list.append(s) + + for _, g in _groups.iteritems(): + self._set_node_weight(g) + open_node_list.append(g) + + return open_node_list + + def _set_node_weight(self, _v): + """Compute each server's or group's weight. + + As larger weight, as more important one to be considered. + """ + + _v.sort_base = -1 + _v.sort_base = self.CPU_weight * _v.vCPU_weight + _v.sort_base += self.mem_weight * _v.mem_weight + _v.sort_base += self.local_disk_weight * _v.local_volume_weight + + # TODO: for update opt. + def _open_prior_list(self, _vms, _groups): + pass + + def _adjust_resources(self): + pass + + def _run_greedy(self, _open_node_list, _avail_resources, _mode): + """Search placements with greedy algorithm.""" + + _open_node_list.sort(key=operator.attrgetter("sort_base"), reverse=True) + + for n in _open_node_list: + self.logger.debug("open node = " + n.vid + " cpus = " + str(n.vCPUs) + " sort = " + str(n.sort_base)) + + while len(_open_node_list) > 0: + n = _open_node_list.pop(0) + + # TODO + # if _mode == "new": + best_resource = self._get_best_resource(n, _avail_resources, _mode) + # else: + # best_resource = self._get_best_resource_for_prior(n, _avail_resources, _mode) + + if best_resource is None: + self.logger.error(self.app.status) + return False + else: + self._deduct_resources(_avail_resources.level, best_resource, n, _mode) + # TODO + # if _mode == "new": + self._close_node_placement(_avail_resources.level, best_resource, n) + # else: + # self._close_prior_placement(_avail_resources.level, best_resource, n) + + return True + + def _get_best_resource(self, _n, _avail_resources, _mode): + """Ddetermine the best placement for given server or affinity group.""" + + candidate_list = [] + prior_resource = None + + # If this is already placed one + if _n in self.prior_placements.keys(): + prior_resource = _avail_resources.get_candidate(self.prior_placements[_n]) + candidate_list.append(prior_resource) + else: + # TODO: need a list of candidates given as input? + + _avail_resources.set_candidates() + + candidate_list = self.constraint_solver.get_candidate_list(_n, + _avail_resources, + self.avail_hosts, + self.avail_groups) + + if len(candidate_list) == 0: + if self.app.status == "ok": + if self.constraint_solver.status != "ok": + self.app.status = self.constraint_solver.status + else: + self.app.status = "fail while getting candidate hosts" + return None + + if len(candidate_list) > 1: + self._set_compute_sort_base(_avail_resources.level, candidate_list) + candidate_list.sort(key=operator.attrgetter("sort_base")) + + for c in candidate_list: + rn = c.get_resource_name(_avail_resources.level) + avail_cpus = c.get_vcpus(_avail_resources.level) + self.logger.debug("candidate = " + rn + " cpus = " + str(avail_cpus) + " sort = " + str(c.sort_base)) + + best_resource = None + if _avail_resources.level == "host" and isinstance(_n, Server): + best_resource = copy.deepcopy(candidate_list[0]) + best_resource.level = "host" + else: + while len(candidate_list) > 0: + cr = candidate_list.pop(0) + + (servers, groups) = get_next_placements(_n, _avail_resources.level) + open_node_list = self._open_list(servers, groups) + + avail_resources = AvailResources(_avail_resources.level) + resource_name = cr.get_resource_name(_avail_resources.level) + + avail_resources.set_next_avail_hosts(_avail_resources.avail_hosts, resource_name) + avail_resources.set_next_level() + + # Recursive call + if self._run_greedy(open_node_list, avail_resources, _mode): + best_resource = copy.deepcopy(cr) + best_resource.level = _avail_resources.level + break + else: + if prior_resource is None: + self.logger.warning("rollback candidate = " + resource_name) + + self._rollback_resources(_n) + self._rollback_node_placement(_n) + + # TODO(Gueyoung): how to track the original error status? + if len(candidate_list) > 0 and self.app.status != "ok": + self.app.status = "ok" + else: + break + + if best_resource is None and len(candidate_list) == 0: + if self.app.status == "ok": + self.app.status = "no available hosts" + self.logger.warning(self.app.status) + + return best_resource + + # TODO: for update opt. + def _get_best_resource_for_prior(self, _n, _avail_resources, _mode): + pass + + def _set_compute_sort_base(self, _level, _candidate_list): + """Compute the weight of each candidate host.""" + + for c in _candidate_list: + cpu_ratio = -1 + mem_ratio = -1 + local_disk_ratio = -1 + + if _level == "rack": + cpu_ratio = float(c.rack_avail_vCPUs) / float(self.resource.CPU_avail) + mem_ratio = float(c.rack_avail_mem) / float(self.resource.mem_avail) + local_disk_ratio = float(c.rack_avail_local_disk) / float(self.resource.local_disk_avail) + elif _level == "host": + cpu_ratio = float(c.host_avail_vCPUs) / float(self.resource.CPU_avail) + mem_ratio = float(c.host_avail_mem) / float(self.resource.mem_avail) + local_disk_ratio = float(c.host_avail_local_disk) / float(self.resource.local_disk_avail) + + c.sort_base = (1.0 - self.CPU_weight) * cpu_ratio + \ + (1.0 - self.mem_weight) * mem_ratio + \ + (1.0 - self.local_disk_weight) * local_disk_ratio + + def _deduct_resources(self, _level, _best, _n, _mode): + """Apply new placement to hosting resources and groups.""" + + # Check if the chosen host is already applied. + if _mode == "new": + if _n in self.prior_placements.keys(): + return + else: + if _n in self.node_placements.keys(): + if _best.level == self.node_placements[_n].level: + return + else: + if _n in self.prior_placements.keys(): + if _best.level == self.prior_placements[_n].level: + return + + # Apply this placement to valet groups. + + exclusivities = _n.get_exclusivities(_level) + if len(exclusivities) == 1: + exclusivity_group = exclusivities[exclusivities.keys()[0]] + self._add_exclusivity(_best, exclusivity_group) + + if isinstance(_n, Group): + self._add_group(_level, _best, _n) + + if len(_n.diversity_groups) > 0: + for _, div_group in _n.diversity_groups.iteritems(): + self._add_group(_level, _best, div_group) + + if len(_n.quorum_diversity_groups) > 0: + for _, div_group in _n.quorum_diversity_groups.iteritems(): + self._add_group(_level, _best, div_group) + + # Apply this placement to hosting resources. + if isinstance(_n, Server) and _level == "host": + self._deduct_server_resources(_best, _n) + + def _add_exclusivity(self, _best, _group): + """Add new exclusivity group.""" + + if _group.vid not in self.avail_groups.keys(): + gr = GroupResource() + gr.name = _group.vid + gr.group_type = "exclusivity" + gr.factory = "valet" + gr.level = _group.level + self.avail_groups[gr.name] = gr + + self.logger.info("find exclusivity (" + _group.vid + ")") + else: + gr = self.avail_groups[_group.vid] + + gr.num_of_placed_servers += 1 + + host_name = _best.get_resource_name(_group.level) + if host_name not in gr.num_of_placed_servers_of_host.keys(): + gr.num_of_placed_servers_of_host[host_name] = 0 + gr.num_of_placed_servers_of_host[host_name] += 1 + + chosen_host = self.avail_hosts[_best.host_name] + if _group.level == "host": + if _group.vid not in chosen_host.host_memberships.keys(): + chosen_host.host_memberships[_group.vid] = gr + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + if _group.vid not in np.rack_memberships.keys(): + np.rack_memberships[_group.vid] = gr + else: # Rack level + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + if _group.vid not in np.rack_memberships.keys(): + np.rack_memberships[_group.vid] = gr + + def _add_group(self, _level, _best, _group): + """Add new valet group.""" + + if _group.vid not in self.avail_groups.keys(): + gr = GroupResource() + gr.name = _group.vid + gr.group_type = _group.group_type + gr.factory = _group.factory + gr.level = _group.level + self.avail_groups[gr.name] = gr + + self.logger.info("find " + _group.group_type + " (" + _group.vid + ")") + else: + gr = self.avail_groups[_group.vid] + + if _group.level == _level: + gr.num_of_placed_servers += 1 + + host_name = _best.get_resource_name(_level) + if host_name not in gr.num_of_placed_servers_of_host.keys(): + gr.num_of_placed_servers_of_host[host_name] = 0 + gr.num_of_placed_servers_of_host[host_name] += 1 + + chosen_host = self.avail_hosts[_best.host_name] + if _level == "host": + if _group.vid not in chosen_host.host_memberships.keys(): + chosen_host.host_memberships[_group.vid] = gr + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + if _group.vid not in np.rack_memberships.keys(): + np.rack_memberships[_group.vid] = gr + else: # Rack level + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + if _group.vid not in np.rack_memberships.keys(): + np.rack_memberships[_group.vid] = gr + + def _deduct_server_resources(self, _best, _n): + """Apply the reduced amount of resources to the chosen host. + + _n is a server and _best is a compute host. + """ + + chosen_host = self.avail_hosts[_best.host_name] + + chosen_host.host_avail_vCPUs -= _n.vCPUs + chosen_host.host_avail_mem -= _n.mem + chosen_host.host_avail_local_disk -= _n.local_volume_size + + # Apply placement decision into NUMA + if _n.need_numa_alignment(): + s_info = {} + s_info["stack_id"] = "none" + s_info["stack_name"] = self.app.app_name + s_info["uuid"] = "none" + s_info["name"] = _n.name + s_info["vcpus"] = _n.vCPUs + s_info["mem"] = _n.mem + + chosen_host.NUMA.deduct_server_resources(s_info) + + # TODO: need non_NUMA server? + # else: + # chosen_host.NUMA.apply_cpus_fairly(_n.vCPUs) + # chosen_host.NUMA.apply_mem_fairly(_n.mem) + + if chosen_host.host_num_of_placed_servers == 0: + self.num_of_hosts += 1 + + chosen_host.host_num_of_placed_servers += 1 + + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + np.rack_avail_vCPUs -= _n.vCPUs + np.rack_avail_mem -= _n.mem + np.rack_avail_local_disk -= _n.local_volume_size + + np.rack_num_of_placed_servers += 1 + + def _close_node_placement(self, _level, _best, _v): + """Record the final placement decision.""" + + if _v not in self.node_placements.keys() and _v not in self.prior_placements.keys(): + if _level == "host" or isinstance(_v, Group): + self.node_placements[_v] = _best + + def _close_prior_placement(self, _level, _best, _v): + """Set the decision for placed server or group.""" + + if _v not in self.prior_placements.keys(): + if _level == "host" or isinstance(_v, Group): + self.prior_placements[_v] = _best + + def _rollback_resources(self, _v): + """Rollback the placement.""" + + if isinstance(_v, Server): + self._rollback_server_resources(_v) + elif isinstance(_v, Group): + for _, v in _v.subgroups.iteritems(): + self._rollback_resources(v) + + if _v in self.node_placements.keys(): + chosen_host = self.avail_hosts[self.node_placements[_v].host_name] + level = self.node_placements[_v].level + + if isinstance(_v, Group): + self._remove_group(chosen_host, _v, level) + + exclusivities = _v.get_exclusivities(level) + if len(exclusivities) == 1: + ex_group = exclusivities[exclusivities.keys()[0]] + self._remove_exclusivity(chosen_host, ex_group) + + if len(_v.diversity_groups) > 0: + for _, div_group in _v.diversity_groups.iteritems(): + self._remove_group(chosen_host, div_group, level) + + if len(_v.quorum_diversity_groups) > 0: + for _, div_group in _v.quorum_diversity_groups.iteritems(): + self._remove_group(chosen_host, div_group, level) + + def _remove_exclusivity(self, _chosen_host, _group): + """Remove the exclusivity group.""" + + gr = self.avail_groups[_group.vid] + + host_name = _chosen_host.get_resource_name(_group.level) + + gr.num_of_placed_servers -= 1 + gr.num_of_placed_servers_of_host[host_name] -= 1 + + if gr.num_of_placed_servers_of_host[host_name] == 0: + del gr.num_of_placed_servers_of_host[host_name] + + if gr.num_of_placed_servers == 0: + del self.avail_groups[_group.vid] + + if _group.level == "host": + if _chosen_host.host_num_of_placed_servers == 0 and \ + _group.vid in _chosen_host.host_memberships.keys(): + del _chosen_host.host_memberships[_group.vid] + + for _, np in self.avail_hosts.iteritems(): + if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name: + if _group.vid in np.rack_memberships.keys(): + del np.rack_memberships[_group.vid] + else: # Rack level + if _chosen_host.rack_num_of_placed_servers == 0: + for _, np in self.avail_hosts.iteritems(): + if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name: + if _group.vid in np.rack_memberships.keys(): + del np.rack_memberships[_group.vid] + + def _remove_group(self, _chosen_host, _group, _level): + """Remove valet group.""" + + if _group.level == _level: + gr = self.avail_groups[_group.vid] + + host_name = _chosen_host.get_resource_name(_level) + + gr.num_of_placed_servers -= 1 + gr.num_of_placed_servers_of_host[host_name] -= 1 + + if gr.num_of_placed_servers_of_host[host_name] == 0: + del gr.num_of_placed_servers_of_host[host_name] + + if gr.num_of_placed_servers == 0: + del self.avail_groups[_group.vid] + + exist_group = True + if _group.vid not in self.avail_groups.keys(): + exist_group = False + else: + if host_name not in gr.num_of_placed_servers_of_host.keys(): + exist_group = False + + if _level == "host": + if not exist_group and _group.vid in _chosen_host.host_memberships.keys(): + del _chosen_host.host_memberships[_group.vid] + + for _, np in self.avail_hosts.iteritems(): + if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name: + if _group.vid in np.rack_memberships.keys(): + del np.rack_memberships[_group.vid] + else: # Rack level + if not exist_group: + for _, np in self.avail_hosts.iteritems(): + if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name: + if _group.vid in np.rack_memberships.keys(): + del np.rack_memberships[_group.vid] + + def _rollback_server_resources(self, _v): + """Return back the amount of resources to host.""" + + if _v in self.node_placements.keys(): + chosen_host = self.avail_hosts[self.node_placements[_v].host_name] + + chosen_host.host_avail_vCPUs += _v.vCPUs + chosen_host.host_avail_mem += _v.mem + chosen_host.host_avail_local_disk += _v.local_volume_size + + # Apply rollback into NUMA + if _v.need_numa_alignment(): + s_info = {} + s_info["stack_id"] = "none" + s_info["stack_name"] = self.app.app_name + s_info["uuid"] = "none" + s_info["name"] = _v.name + s_info["vcpus"] = _v.vCPUs + s_info["mem"] = _v.mem + + chosen_host.NUMA.rollback_server_resources(s_info) + + chosen_host.host_num_of_placed_servers -= 1 + + if chosen_host.host_num_of_placed_servers == 0: + self.num_of_hosts -= 1 + + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + np.rack_avail_vCPUs += _v.vCPUs + np.rack_avail_mem += _v.mem + np.rack_avail_local_disk += _v.local_volume_size + + np.rack_num_of_placed_servers -= 1 + + # If the chosen host was a new host and its host type was unknown, + # rollback to the original unknown state. + if chosen_host.host_num_of_placed_servers == 0: + if chosen_host.old_candidate_host_types is not None and len(chosen_host.old_candidate_host_types) > 0: + flavor_type_list = _v.get_flavor_types() + ha = self.avail_groups[flavor_type_list[0]] + + chosen_host.rollback_avail_resources(ha) + chosen_host.candidate_host_types = copy.deepcopy(chosen_host.old_candidate_host_types) + chosen_host.old_candidate_host_types.clear() + + for hrk, hr in self.avail_hosts.iteritems(): + if hrk != chosen_host.host_name: + if hr.rack_name == chosen_host.rack_name: + hr.rollback_avail_rack_resources(ha, + chosen_host.rack_avail_vCPUs, + chosen_host.rack_avail_mem, + chosen_host.rack_avail_local_disk) + + def _rollback_node_placement(self, _v): + """Remove placement decisions.""" + + if _v in self.node_placements.keys(): + del self.node_placements[_v] + + if isinstance(_v, Group): + for _, sg in _v.subgroups.iteritems(): + self._rollback_node_placement(sg) diff --git a/engine/src/valet/engine/search/search_helper.py b/engine/src/valet/engine/search/search_helper.py new file mode 100644 index 0000000..0e64ef7 --- /dev/null +++ b/engine/src/valet/engine/search/search_helper.py @@ -0,0 +1,43 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +from valet.engine.app_manager.group import Group, LEVEL + + +def get_next_placements(_n, _level): + """Get servers and groups to be handled in the next level of search.""" + + servers = {} + groups = {} + + if isinstance(_n, Group): + if LEVEL.index(_n.level) < LEVEL.index(_level): + groups[_n.vid] = _n + else: + for _, sg in _n.subgroups.iteritems(): + if isinstance(sg, Group): + groups[sg.vid] = sg + else: + servers[sg.vid] = sg + else: + servers[_n.vid] = _n + + return servers, groups |