summaryrefslogtreecommitdiffstats
path: root/engine
diff options
context:
space:
mode:
Diffstat (limited to 'engine')
-rw-r--r--engine/src/valet/engine/search/__init__.py18
-rw-r--r--engine/src/valet/engine/search/avail_resources.py76
-rw-r--r--engine/src/valet/engine/search/constraint_solver.py117
-rw-r--r--engine/src/valet/engine/search/optimizer.py494
-rw-r--r--engine/src/valet/engine/search/resource.py264
-rw-r--r--engine/src/valet/engine/search/search.py708
-rw-r--r--engine/src/valet/engine/search/search_helper.py43
7 files changed, 1720 insertions, 0 deletions
diff --git a/engine/src/valet/engine/search/__init__.py b/engine/src/valet/engine/search/__init__.py
new file mode 100644
index 0000000..bd50995
--- /dev/null
+++ b/engine/src/valet/engine/search/__init__.py
@@ -0,0 +1,18 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
diff --git a/engine/src/valet/engine/search/avail_resources.py b/engine/src/valet/engine/search/avail_resources.py
new file mode 100644
index 0000000..c4484b8
--- /dev/null
+++ b/engine/src/valet/engine/search/avail_resources.py
@@ -0,0 +1,76 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+#!/bin/python
+
+
+from valet.engine.app_manager.group import LEVEL
+
+
+class AvailResources(object):
+ """Container to keep hosting resources and candidate resources
+
+ of each level (host or rack) for search.
+ """
+
+ def __init__(self, _level):
+ self.level = _level
+ self.avail_hosts = {}
+ self.candidates = {}
+
+ def set_next_level(self):
+ """Get the next level to search."""
+
+ current_level_index = LEVEL.index(self.level)
+ next_level_index = current_level_index - 1
+
+ if next_level_index < 0:
+ self.level = LEVEL[0]
+ else:
+ self.level = LEVEL[next_level_index]
+
+ def set_next_avail_hosts(self, _avail_hosts, _resource_of_level):
+ """Set the next level of available hosting resources."""
+
+ for hk, h in _avail_hosts.iteritems():
+ if self.level == "rack":
+ if h.rack_name == _resource_of_level:
+ self.avail_hosts[hk] = h
+ elif self.level == "host":
+ if h.host_name == _resource_of_level:
+ self.avail_hosts[hk] = h
+
+ def set_candidates(self):
+ if self.level == "rack":
+ for _, h in self.avail_hosts.iteritems():
+ self.candidates[h.rack_name] = h
+ elif self.level == "host":
+ self.candidates = self.avail_hosts
+
+ def get_candidate(self, _resource):
+ candidate = None
+
+ if self.level == "rack":
+ for _, h in self.avail_hosts.iteritems():
+ if h.rack_name == _resource.rack_name:
+ candidate = h
+ elif self.level == "host":
+ if _resource.host_name in self.avail_hosts.keys():
+ candidate = self.avail_hosts[_resource.host_name]
+
+ return candidate
diff --git a/engine/src/valet/engine/search/constraint_solver.py b/engine/src/valet/engine/search/constraint_solver.py
new file mode 100644
index 0000000..2593db8
--- /dev/null
+++ b/engine/src/valet/engine/search/constraint_solver.py
@@ -0,0 +1,117 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+from valet.engine.search.filters.affinity_filter import AffinityFilter
+from valet.engine.search.filters.aggregate_instance_filter import AggregateInstanceExtraSpecsFilter
+from valet.engine.search.filters.az_filter import AvailabilityZoneFilter
+from valet.engine.search.filters.cpu_filter import CPUFilter
+from valet.engine.search.filters.disk_filter import DiskFilter
+from valet.engine.search.filters.diversity_filter import DiversityFilter
+from valet.engine.search.filters.dynamic_aggregate_filter import DynamicAggregateFilter
+from valet.engine.search.filters.exclusivity_filter import ExclusivityFilter
+from valet.engine.search.filters.mem_filter import MemFilter
+from valet.engine.search.filters.no_exclusivity_filter import NoExclusivityFilter
+from valet.engine.search.filters.numa_filter import NUMAFilter
+from valet.engine.search.filters.quorum_diversity_filter import QuorumDiversityFilter
+
+
+class ConstraintSolver(object):
+ """Constraint solver to filter out candidate hosts."""
+
+ def __init__(self, _logger):
+ """Define fileters and application order."""
+
+ self.logger = _logger
+
+ self.filter_list = []
+
+ # TODO(Gueyoung): add soft-affinity and soft-diversity filters
+
+ # TODO(Gueyoung): the order of applying filters?
+
+ # Apply platform filters first
+ self.filter_list.append(AvailabilityZoneFilter())
+ self.filter_list.append(AggregateInstanceExtraSpecsFilter())
+ self.filter_list.append(CPUFilter())
+ self.filter_list.append(MemFilter())
+ self.filter_list.append(DiskFilter())
+ self.filter_list.append(NUMAFilter())
+
+ # Apply Valet filters next
+ self.filter_list.append(DiversityFilter())
+ self.filter_list.append(QuorumDiversityFilter())
+ self.filter_list.append(ExclusivityFilter())
+ self.filter_list.append(NoExclusivityFilter())
+ self.filter_list.append(AffinityFilter())
+
+ # Apply dynamic aggregate filter to determine the host's aggregate
+ # in a lazy way.
+ self.filter_list.append(DynamicAggregateFilter())
+
+ self.status = "ok"
+
+ def get_candidate_list(self, _n, _avail_resources, _avail_hosts, _avail_groups):
+ """Filter candidate hosts using a list of filters."""
+
+ level = _avail_resources.level
+
+ candidate_list = []
+
+ # This is the resource which name is 'any'
+ ghost_candidate = None
+
+ for _, r in _avail_resources.candidates.iteritems():
+ candidate_list.append(r)
+
+ if r.get_resource_name(level) == "any":
+ ghost_candidate = r
+
+ if len(candidate_list) == 0:
+ self.status = "no candidate for node = " + _n.vid
+ self.logger.warning(self.status)
+ return []
+
+ for f in self.filter_list:
+ f.init_condition()
+
+ if not f.check_pre_condition(level, _n, _avail_hosts, _avail_groups):
+ if f.status is not None:
+ self.status = f.status
+ self.logger.error(self.status)
+ return []
+ else:
+ self.logger.debug("skip " + f.name + " constraint for node = " + _n.vid)
+
+ continue
+
+ candidate_list = f.filter_candidates(level, _n, candidate_list)
+
+ if ghost_candidate and ghost_candidate not in candidate_list:
+ candidate_list.append(ghost_candidate)
+
+ if len(candidate_list) == 0:
+ self.status = "violate " + level + " " + f.name + " constraint for node = " + _n.vid
+ if f.status is not None:
+ self.status += " detail: " + f.status
+ self.logger.debug(self.status)
+ return []
+ elif len(candidate_list) > 0:
+ str_num = str(len(candidate_list))
+ self.logger.debug("pass " + f.name + " constraint for node = " + _n.vid + " with " + str_num)
+
+ return candidate_list
diff --git a/engine/src/valet/engine/search/optimizer.py b/engine/src/valet/engine/search/optimizer.py
new file mode 100644
index 0000000..de45cee
--- /dev/null
+++ b/engine/src/valet/engine/search/optimizer.py
@@ -0,0 +1,494 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+from valet.engine.app_manager.group import Group
+from valet.engine.app_manager.server import Server
+from valet.engine.search.search import Search
+
+
+class Optimizer(object):
+ """Optimizer to compute the optimal placements."""
+
+ def __init__(self, _logger):
+ self.logger = _logger
+
+ self.search = Search(self.logger)
+
+ def place(self, _app):
+ """Scheduling placements given app."""
+
+ _app.set_weight()
+ _app.set_optimization_priority()
+
+ if self.search.place(_app) is True:
+ if _app.status == "ok":
+ self._set_app(_app, "create")
+
+ self._set_resource(_app)
+ if _app.status != "ok":
+ return
+ else:
+ if _app.status == "ok":
+ _app.status = "failed"
+
+ self._rollback_placements(_app)
+
+ def update(self, _app):
+ """Update state of current placements."""
+
+ if _app.state == "delete":
+ self._update_app_for_delete(_app)
+
+ self._update_resource(_app)
+ if _app.status != "ok":
+ return
+ else:
+ _app.status = "unknown state while updating app"
+
+ def confirm(self, _app):
+ """Confirm prior request."""
+
+ if _app.state == "created":
+ self._update_app(_app)
+ if _app.status != "ok":
+ return
+
+ self._update_resource(_app)
+ if _app.status != "ok":
+ return
+ elif _app.state == "deleted":
+ self._remove_resource(_app)
+ if _app.status != "ok":
+ return
+ else:
+ _app.status = "unknown state while updating app"
+ return
+
+ def rollback(self, _app):
+ """Rollback prior decision."""
+
+ if _app.state == "created":
+ self._update_app(_app)
+ if _app.status != "ok":
+ return
+
+ self._update_resource(_app)
+ if _app.status != "ok":
+ return
+ elif _app.state == "deleted":
+ self._remove_resource(_app)
+ if _app.status != "ok":
+ return
+ else:
+ _app.status = "unknown state while updating app"
+
+ def _set_app(self, _app, _state):
+ """Update with assigned hosts."""
+
+ for v, p in self.search.node_placements.iteritems():
+ if isinstance(v, Server):
+ v.host = p.host_name
+ if p.rack_name != "any":
+ v.host_group = p.rack_name
+
+ host = self.search.avail_hosts[p.host_name]
+
+ s_info = {}
+ if _app.app_id is None or _app.app_id == "none":
+ s_info["stack_id"] = "none"
+ else:
+ s_info["stack_id"] = _app.app_id
+ s_info["stack_name"] = _app.app_name
+ s_info["uuid"] = "none"
+ s_info["name"] = v.name
+
+ v.numa = host.NUMA.pop_cell_of_server(s_info)
+
+ v.state = _state
+
+ # Put back servers from groups.
+ _app.reset_servers()
+
+ def _update_app(self, _app):
+ """Update state of servers."""
+
+ for sk, s in _app.servers.iteritems():
+ if s["host"] == "none":
+ continue
+
+ s["state"] = _app.state
+
+ host_name = s.get("host")
+
+ host = None
+ if host_name in _app.resource.hosts.keys():
+ host = _app.resource.hosts[host_name]
+
+ s_info = {}
+ if _app.app_id is None or _app.app_id == "none":
+ s_info["stack_id"] = "none"
+ else:
+ s_info["stack_id"] = _app.app_id
+ s_info["stack_name"] = _app.app_name
+ s_info["uuid"] = "none"
+ s_info["name"] = s.get("name")
+
+ # Check if the prior placements changed.
+ if host is None or \
+ not host.is_available() or \
+ not host.has_server(s_info):
+ _app.status = "server (" + sk + ") placement has been changed"
+ self.logger.error(_app.status)
+
+ def _update_app_for_delete(self, _app):
+ """Check the prior placements and update state
+
+ And update placements if they have been changed.
+ """
+
+ for sk, s in _app.servers.iteritems():
+ if s["host"] == "none":
+ continue
+
+ s["state"] = _app.state
+
+ host_name = s.get("host")
+
+ host = None
+ if host_name in _app.resource.hosts.keys():
+ host = _app.resource.hosts[host_name]
+
+ s_info = {}
+ if _app.app_id is None or _app.app_id == "none":
+ s_info["stack_id"] = "none"
+ else:
+ s_info["stack_id"] = _app.app_id
+ s_info["stack_name"] = _app.app_name
+ s_info["uuid"] = "none"
+ s_info["name"] = s.get("name")
+
+ # Check if the prior placements changed.
+ if host is None or \
+ not host.is_available() or \
+ not host.has_server(s_info):
+ self.logger.warning("server (" + sk + ") placement has been changed")
+
+ new_host = _app.resource.get_host_of_server(s_info)
+
+ if new_host is not None:
+ s["host"] = new_host.name
+ else:
+ s["host"] = "none"
+ self.logger.warning("server (" + sk + ") not exists")
+
+ def _set_resource(self, _app):
+ """Update resource status based on new placements."""
+
+ # If host's type (i.e., host-aggregate) is not determined before,
+ # Convert/set host's type to one as specified in VM.
+ for v, p in self.search.node_placements.iteritems():
+ if isinstance(v, Server):
+ # The host object p was deep copied, so use original object.
+ rh = self.search.avail_hosts[p.host_name]
+
+ if rh.old_candidate_host_types is not None and len(rh.old_candidate_host_types) > 0:
+ flavor_type_list = v.get_flavor_types()
+ ha = self.search.avail_groups[flavor_type_list[0]]
+
+ self._convert_host(rh,
+ ha.name,
+ rh.get_host_type(ha, rh.old_candidate_host_types),
+ _app.resource)
+
+ placements = {}
+
+ for v, p in self.search.node_placements.iteritems():
+ if isinstance(v, Server):
+ s_info = {}
+
+ if _app.app_id is None or _app.app_id == "none":
+ s_info["stack_id"] = "none"
+ else:
+ s_info["stack_id"] = _app.app_id
+ s_info["stack_name"] = _app.app_name
+
+ s_info["uuid"] = "none"
+ s_info["orch_id"] = v.orch_id
+ s_info["name"] = v.name
+
+ s_info["flavor_id"] = v.flavor
+ s_info["vcpus"] = v.vCPUs
+ s_info["mem"] = v.mem
+ s_info["disk"] = v.local_volume_size
+ s_info["numa"] = v.numa
+
+ s_info["image_id"] = v.image
+ s_info["tenant_id"] = _app.tenant_id
+
+ s_info["state"] = v.state
+ s_info["status"] = "valid"
+
+ placements[v.vid] = {}
+ placements[v.vid]["new_host"] = p.host_name
+ placements[v.vid]["info"] = s_info
+
+ # Update compute host with new servers
+ if not _app.resource.update_server_placements(change_of_placements=placements):
+ _app.status = "fail while updating server placements"
+ return
+
+ groups = {}
+
+ for v, p in self.search.node_placements.iteritems():
+ if isinstance(v, Server):
+ rh = self.search.avail_hosts[p.host_name]
+
+ for gk, g in rh.host_memberships.iteritems():
+ if g.factory in ("valet", "server-group"):
+ if g.level == "host":
+ _app.resource.add_group(gk,
+ g.group_type,
+ g.level,
+ g.factory,
+ rh.host_name)
+
+ if rh.rack_name != "any":
+ for gk, g in rh.rack_memberships.iteritems():
+ if g.factory in ("valet", "server-group"):
+ if g.level == "rack":
+ _app.resource.add_group(gk,
+ g.group_type,
+ g.level,
+ g.factory,
+ rh.rack_name)
+
+ s_info = placements[v.vid].get("info")
+
+ self._collect_groups_of_server(v, s_info, groups)
+
+ # Update groups with new servers
+ _app.resource.update_server_grouping(change_of_placements=placements,
+ new_groups=groups)
+
+ _app.resource.update_resource()
+
+ def _convert_host(self, _rhost, _ha_name, _host_type, _resource):
+ """Convert host's type into the specific type as given."""
+
+ host = _resource.hosts[_rhost.host_name]
+
+ if host.candidate_host_types is None or len(host.candidate_host_types) == 0:
+ return
+
+ host.vCPUs = _host_type["vCPUs"]
+ host.original_vCPUs = _host_type["original_vCPUs"]
+ host.avail_vCPUs = _host_type["avail_vCPUs"]
+ host.mem_cap = _host_type["mem"]
+ host.original_mem_cap = _host_type["original_mem"]
+ host.avail_mem_cap = _host_type["avail_mem"]
+ host.local_disk_cap = _host_type["local_disk"]
+ host.original_local_disk_cap = _host_type["original_local_disk"]
+ host.avail_local_disk_cap = _host_type["avail_local_disk"]
+ host.vCPUs_used = _host_type["vCPUs_used"]
+ host.free_mem_mb = _host_type["free_mem_mb"]
+ host.free_disk_gb = _host_type["free_disk_gb"]
+ host.disk_available_least = _host_type["disk_available_least"]
+
+ host.NUMA = _rhost.NUMA
+
+ ha = _resource.groups[_ha_name]
+ host.memberships[ha.name] = ha
+ ha.member_hosts[host.name] = []
+ ha.updated = True
+
+ _resource.mark_host_updated(host.name)
+
+ _resource.update_resource()
+
+ if host.candidate_host_types is not None:
+ host.candidate_host_types.clear()
+
+ def _rollback_placements(self, _app):
+ """Remove placements when they fail.
+
+ Remove placements from NUMA cells of resource object.
+ """
+
+ for v, p in self.search.node_placements.iteritems():
+ if isinstance(v, Server):
+ s_info = {}
+
+ if _app.app_id is None or _app.app_id == "none":
+ s_info["stack_id"] = "none"
+ else:
+ s_info["stack_id"] = _app.app_id
+ s_info["stack_name"] = _app.app_name
+
+ s_info["uuid"] = "none"
+ s_info["orch_id"] = v.orch_id
+ s_info["name"] = v.name
+
+ s_info["flavor_id"] = v.flavor
+ s_info["vcpus"] = v.vCPUs
+ s_info["mem"] = v.mem
+ s_info["disk"] = v.local_volume_size
+ s_info["numa"] = v.numa
+
+ s_info["image_id"] = v.image
+ s_info["tenant_id"] = _app.tenant_id
+
+ s_info["state"] = v.state
+ s_info["status"] = "valid"
+
+ host = _app.resource.hosts[p.host_name]
+ host.NUMA.rollback_server_resources(s_info)
+
+ def _collect_groups_of_server(self, _v, _s_info, _groups):
+ """Collect all groups of the server and its parent (affinity)."""
+
+ # TODO(Gueyoung): track host-aggregates and availability-zone?
+
+ for gk in _v.exclusivity_groups.keys():
+ if gk not in _groups.keys():
+ _groups[gk] = []
+ _groups[gk].append(_s_info)
+
+ for gk in _v.diversity_groups.keys():
+ if gk not in _groups.keys():
+ _groups[gk] = []
+ _groups[gk].append(_s_info)
+
+ for gk in _v.quorum_diversity_groups.keys():
+ if gk not in _groups.keys():
+ _groups[gk] = []
+ _groups[gk].append(_s_info)
+
+ if isinstance(_v, Group):
+ if _v.vid not in _groups.keys():
+ _groups[_v.vid] = []
+ _groups[_v.vid].append(_s_info)
+
+ # Recursively check server or its affinity group.
+ if _v.surgroup is not None:
+ self._collect_groups_of_server(_v.surgroup, _s_info, _groups)
+
+ def _remove_resource(self, _app):
+ """Remove servers from resources.
+
+ Resources: NUMA, host, host_group, datacenter,
+ valet groups and server-groups.
+ """
+
+ placements = {}
+
+ for sk, s in _app.servers.iteritems():
+ if s["host"] == "none":
+ continue
+
+ s_info = {}
+
+ if _app.app_id is None or _app.app_id == "none":
+ s_info["stack_id"] = "none"
+ else:
+ s_info["stack_id"] = _app.app_id
+ s_info["stack_name"] = _app.app_name
+
+ s_info["uuid"] = "none"
+ s_info["name"] = s.get("name")
+
+ s_info["flavor_id"] = s.get("flavor")
+ s_info["vcpus"] = s.get("cpus")
+ s_info["mem"] = s.get("mem")
+ s_info["disk"] = s.get("local_volume")
+ s_info["numa"] = s.get("numa")
+
+ s_info["image_id"] = s.get("image")
+ s_info["tenant_id"] = _app.tenant_id
+
+ s_info["state"] = "deleted"
+
+ if s_info["stack_id"] != "none":
+ sid = s_info["stack_id"] + ":" + s_info["name"]
+ else:
+ sid = s_info["stack_name"] + ":" + s_info["name"]
+
+ placements[sid] = {}
+ placements[sid]["old_host"] = s.get("host")
+ placements[sid]["info"] = s_info
+
+ if not _app.resource.update_server_placements(change_of_placements=placements):
+ _app.status = "fail while updating server placements"
+ return
+
+ _app.resource.update_server_grouping(change_of_placements=placements,
+ new_groups={})
+
+ _app.resource.update_resource()
+
+ def _update_resource(self, _app):
+ """Update state of servers in resources.
+
+ Resources: NUMA, host, host_group, datacenter,
+ valet groups and server-groups.
+ """
+
+ placements = {}
+
+ for sk, s in _app.servers.iteritems():
+ if s["host"] == "none":
+ continue
+
+ s_info = {}
+
+ if _app.app_id is None or _app.app_id == "none":
+ s_info["stack_id"] = "none"
+ else:
+ s_info["stack_id"] = _app.app_id
+ s_info["stack_name"] = _app.app_name
+
+ s_info["uuid"] = "none"
+ s_info["name"] = s.get("name")
+
+ s_info["flavor_id"] = "none"
+ s_info["vcpus"] = -1
+ s_info["mem"] = -1
+ s_info["disk"] = -1
+ s_info["numa"] = "none"
+
+ s_info["image_id"] = "none"
+
+ s_info["state"] = s.get("state")
+ s_info["status"] = "none"
+
+ if s_info["stack_id"] != "none":
+ sid = s_info["stack_id"] + ":" + s_info["name"]
+ else:
+ sid = s_info["stack_name"] + ":" + s_info["name"]
+
+ placements[sid] = {}
+ placements[sid]["host"] = s.get("host")
+ placements[sid]["info"] = s_info
+
+ if not _app.resource.update_server_placements(change_of_placements=placements):
+ _app.status = "fail while updating server placements"
+ return
+
+ _app.resource.update_server_grouping(change_of_placements=placements,
+ new_groups={})
+
+ _app.resource.update_resource()
diff --git a/engine/src/valet/engine/search/resource.py b/engine/src/valet/engine/search/resource.py
new file mode 100644
index 0000000..18c8ca7
--- /dev/null
+++ b/engine/src/valet/engine/search/resource.py
@@ -0,0 +1,264 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+from valet.engine.resource_manager.resources.numa import NUMA
+
+
+class GroupResource(object):
+ """Container for all resource group includes
+
+ affinity, diversity, quorum-diversity, exclusivity, host-aggregate and availability.
+ """
+
+ def __init__(self):
+ self.name = None
+
+ self.group_type = "aggr"
+ self.factory = "nova"
+ self.level = "host"
+
+ self.metadata = {}
+
+ self.original_num_of_placed_servers = 0
+ self.num_of_placed_servers = 0
+
+ # key = host (host or rack), value = num_of_placed_servers
+ self.num_of_placed_servers_of_host = {}
+
+
+class HostResource(object):
+ """Container for hosting resource (host, rack)."""
+
+ def __init__(self):
+ # Host info
+ self.host_name = None
+
+ self.host_memberships = {} # all mapped groups to host
+
+ self.host_avail_vCPUs = 0 # remaining vCPUs after overcommit
+ self.host_avail_mem = 0 # remaining mem cap after
+ self.host_avail_local_disk = 0 # remaining local disk cap after overcommit
+
+ self.NUMA = None
+
+ self.host_num_of_placed_servers = 0 # the number of vms currently placed in this host
+
+ # If the host type is not determined yet,
+ # provide possible host types.
+ self.candidate_host_types = {}
+ self.old_candidate_host_types = {} # For rollback
+
+ # To track newly added host types.
+ self.new_host_aggregate_list = []
+
+ # Rack info
+ self.rack_name = None # where this host is located
+
+ self.rack_memberships = {}
+
+ self.rack_avail_vCPUs = 0
+ self.rack_avail_mem = 0
+ self.rack_avail_local_disk = 0
+
+ self.rack_num_of_placed_servers = 0
+
+ # To track newly added host types.
+ self.new_rack_aggregate_list = []
+
+ self.level = None # level of placement
+
+ self.sort_base = 0 # order to place
+
+ def get_host_type(self, _ha, _host_types):
+ """Take host-aggregate group and
+ return default host type of the host-aggregate.
+ """
+
+ host_type = None
+
+ if _host_types is None:
+ return host_type
+
+ host_type_list = _host_types[_ha.name]
+ for ht in host_type_list:
+ if "default" in ht.keys():
+ host_type = ht
+ break
+
+ return host_type
+
+ def adjust_avail_resources(self, _ha):
+ """Take host-aggregate group and
+ add it to host/rack memberships and
+ adjust the amount of available resources based on
+ the corresponding host type.
+ """
+
+ if _ha.name not in self.host_memberships.keys():
+ self.host_memberships[_ha.name] = _ha
+ self.new_host_aggregate_list.append(_ha.name)
+ if _ha.name not in self.rack_memberships.keys():
+ self.rack_memberships[_ha.name] = _ha
+ self.new_rack_aggregate_list.append(_ha.name)
+
+ host_type = self.get_host_type(_ha, self.candidate_host_types)
+
+ self.host_avail_vCPUs = host_type["avail_vCPUs"]
+ self.host_avail_mem = host_type["avail_mem"]
+ self.host_avail_local_disk = host_type["avail_local_disk"]
+
+ self.NUMA = NUMA(numa=host_type["NUMA"])
+
+ if self.candidate_host_types is not None:
+ for htk, htl in self.candidate_host_types.iteritems():
+ if htk == "mockup":
+ self.rack_avail_vCPUs -= htl[0]["avail_vCPUs"]
+ self.rack_avail_mem -= htl[0]["avail_mem"]
+ self.rack_avail_local_disk -= htl[0]["avail_local_disk"]
+
+ self.rack_avail_vCPUs += self.host_avail_vCPUs
+ self.rack_avail_mem += self.host_avail_mem
+ self.rack_avail_local_disk += self.host_avail_local_disk
+
+ break
+
+ def adjust_avail_rack_resources(self, _ha, _cpus, _mem, _disk):
+ """Take host-aggregate group and the amount of available resources
+ add the group into rack membership and
+ adjust the amount of available rack resources.
+ """
+
+ if _ha.name not in self.rack_memberships.keys():
+ self.rack_memberships[_ha.name] = _ha
+ self.new_rack_aggregate_list.append(_ha.name)
+
+ self.rack_avail_vCPUs = _cpus
+ self.rack_avail_mem = _mem
+ self.rack_avail_local_disk = _disk
+
+ def rollback_avail_resources(self, _ha):
+ if _ha.name in self.new_host_aggregate_list:
+ del self.host_memberships[_ha.name]
+ self.new_host_aggregate_list.remove(_ha.name)
+ if _ha.name in self.new_rack_aggregate_list:
+ del self.rack_memberships[_ha.name]
+ self.new_rack_aggregate_list.remove(_ha.name)
+
+ host_type = self.get_host_type(_ha, self.old_candidate_host_types)
+
+ if self.old_candidate_host_types is not None:
+ for htk, htl in self.old_candidate_host_types.iteritems():
+ if htk == "mockup":
+ self.host_avail_vCPUs = htl[0]["avail_vCPUs"]
+ self.host_avail_mem = htl[0]["avail_mem"]
+ self.host_avail_local_disk = htl[0]["avail_local_disk"]
+
+ self.NUMA = NUMA(numa=htl[0]["NUMA"])
+
+ self.rack_avail_vCPUs -= host_type["avail_vCPUs"]
+ self.rack_avail_mem -= host_type["avail_mem"]
+ self.rack_avail_local_disk -= host_type["avail_local_disk"]
+
+ self.rack_avail_vCPUs += self.host_avail_vCPUs
+ self.rack_avail_mem += self.host_avail_mem
+ self.rack_avail_local_disk += self.host_avail_local_disk
+
+ break
+
+ def rollback_avail_rack_resources(self, _ha, _cpus, _mem, _disk):
+ if _ha.name in self.new_rack_aggregate_list:
+ del self.rack_memberships[_ha.name]
+ self.new_rack_aggregate_list.remove(_ha.name)
+
+ self.rack_avail_vCPUs = _cpus
+ self.rack_avail_mem = _mem
+ self.rack_avail_local_disk = _disk
+
+ def get_resource_name(self, _level):
+ name = "unknown"
+
+ if _level == "rack":
+ name = self.rack_name
+ elif _level == "host":
+ name = self.host_name
+
+ return name
+
+ def get_vcpus(self, _level):
+ avail_vcpus = 0
+
+ if _level == "rack":
+ avail_vcpus = self.rack_avail_vCPUs
+ elif _level == "host":
+ avail_vcpus = self.host_avail_vCPUs
+
+ return avail_vcpus
+
+ def get_mem(self, _level):
+ avail_mem = 0
+
+ if _level == "rack":
+ avail_mem = self.rack_avail_mem
+ elif _level == "host":
+ avail_mem = self.host_avail_mem
+
+ return avail_mem
+
+ def get_local_disk(self, _level):
+ avail_local_disk = 0
+
+ if _level == "rack":
+ avail_local_disk = self.rack_avail_local_disk
+ elif _level == "host":
+ avail_local_disk = self.host_avail_local_disk
+
+ return avail_local_disk
+
+ def get_memberships(self, _level):
+ memberships = None
+
+ if _level == "rack":
+ memberships = self.rack_memberships
+ elif _level == "host":
+ memberships = self.host_memberships
+
+ return memberships
+
+ def get_all_memberships(self, _level):
+ memberships = {}
+
+ if _level == "rack":
+ for mk, m in self.rack_memberships.iteritems():
+ memberships[mk] = m
+ for mk, m in self.host_memberships.iteritems():
+ memberships[mk] = m
+ elif _level == "host":
+ for mk, m in self.host_memberships.iteritems():
+ memberships[mk] = m
+
+ return memberships
+
+ def get_num_of_placed_servers(self, _level):
+ num_of_servers = 0
+
+ if _level == "rack":
+ num_of_servers = self.rack_num_of_placed_servers
+ elif _level == "host":
+ num_of_servers = self.host_num_of_placed_servers
+
+ return num_of_servers
diff --git a/engine/src/valet/engine/search/search.py b/engine/src/valet/engine/search/search.py
new file mode 100644
index 0000000..40added
--- /dev/null
+++ b/engine/src/valet/engine/search/search.py
@@ -0,0 +1,708 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+import copy
+import operator
+
+from valet.engine.app_manager.server import Server
+from valet.engine.resource_manager.resources.datacenter import Datacenter
+from valet.engine.search.avail_resources import AvailResources
+from valet.engine.search.constraint_solver import ConstraintSolver
+from valet.engine.search.resource import GroupResource, HostResource
+from valet.engine.search.search_helper import *
+
+
+class Search(object):
+ """Bin-packing approach in the hierachical datacenter layout."""
+
+ def __init__(self, _logger):
+ self.logger = _logger
+
+ # Search inputs
+ self.app = None
+ self.resource = None
+
+ # Snapshot of current resource status
+ self.avail_hosts = {}
+ self.avail_groups = {}
+
+ # Search results
+ self.node_placements = {}
+ self.prior_placements = {} # TODO
+ self.num_of_hosts = 0
+
+ # Optimization criteria
+ self.CPU_weight = -1
+ self.mem_weight = -1
+ self.local_disk_weight = -1
+
+ self.constraint_solver = None
+
+ def _init_search(self, _app):
+ """Init the search information and the output results."""
+
+ self.app = _app
+ self.resource = _app.resource
+
+ self.avail_hosts.clear()
+ self.avail_groups.clear()
+
+ self.node_placements.clear()
+ self.prior_placements.clear() # TODO
+ self.num_of_hosts = 0
+
+ self.CPU_weight = -1
+ self.mem_weight = -1
+ self.local_disk_weight = -1
+
+ self.constraint_solver = ConstraintSolver(self.logger)
+
+ self._create_avail_groups()
+ self._create_avail_hosts()
+
+ # TODO
+ # if len(self.app.old_vm_map) > 0:
+ # self._adjust_resources()
+
+ self._set_resource_weights()
+
+ def _create_avail_groups(self):
+ """Collect all available resource groups.
+
+ Group type is affinity, diversity, quorum-diversity, exclusivity,
+ availability-zone, host-aggregate, server-group.
+ """
+
+ for gk, g in self.resource.groups.iteritems():
+ if g.status != "enabled":
+ self.logger.debug("group (" + g.name + ") disabled")
+ continue
+
+ gr = GroupResource()
+ gr.name = gk
+
+ gr.group_type = g.group_type
+ gr.factory = g.factory
+
+ if g.level is not None:
+ gr.level = g.level
+ else:
+ gr.level = "host"
+
+ for mk, mv in g.metadata.iteritems():
+ gr.metadata[mk] = mv
+
+ gr.original_num_of_placed_servers = len(g.server_list)
+ gr.num_of_placed_servers = len(g.server_list)
+
+ for hk in g.member_hosts.keys():
+ gr.num_of_placed_servers_of_host[hk] = len(g.member_hosts[hk])
+
+ self.avail_groups[gk] = gr
+
+ def _create_avail_hosts(self):
+ """Create all available hosts."""
+
+ for hk, host in self.resource.hosts.iteritems():
+ if not host.is_available():
+ self.logger.warning("host (" + host.name + ") not available at this time")
+ continue
+
+ hr = HostResource()
+ hr.host_name = hk
+
+ for mk in host.memberships.keys():
+ if mk in self.avail_groups.keys():
+ hr.host_memberships[mk] = self.avail_groups[mk]
+
+ # Not used by Valet, only capacity planning
+ try:
+ for htk, ht in host.candidate_host_types.iteritems():
+ hr.candidate_host_types[htk] = copy.deepcopy(ht)
+ except AttributeError:
+ hr.candidate_host_types = {}
+
+ hr.host_avail_vCPUs = host.avail_vCPUs
+ hr.host_avail_mem = host.avail_mem_cap
+ hr.host_avail_local_disk = host.avail_local_disk_cap
+
+ hr.NUMA = host.NUMA # NOTE: refer to host's NUMA, instead of deepcopy.
+
+ hr.host_num_of_placed_servers = len(host.server_list)
+
+ rack = host.host_group
+ if isinstance(rack, Datacenter):
+ hr.rack_name = "any"
+ else:
+ if not rack.is_available():
+ continue
+
+ hr.rack_name = rack.name
+
+ for mk in rack.memberships.keys():
+ if mk in self.avail_groups.keys():
+ hr.rack_memberships[mk] = self.avail_groups[mk]
+
+ hr.rack_avail_vCPUs = rack.avail_vCPUs
+ hr.rack_avail_mem = rack.avail_mem_cap
+ hr.rack_avail_local_disk = rack.avail_local_disk_cap
+
+ hr.rack_num_of_placed_servers = len(rack.server_list)
+
+ if hr.host_num_of_placed_servers > 0:
+ self.num_of_hosts += 1
+
+ self.avail_hosts[hk] = hr
+
+ def _set_resource_weights(self):
+ """Compute weight of each resource type.
+
+ As larger weight, as more important resource to be considered.
+ """
+
+ denominator = 0.0
+ for _, w in self.app.optimization_priority:
+ denominator += w
+
+ for t, w in self.app.optimization_priority:
+ if t == "cpu":
+ self.CPU_weight = float(w / denominator)
+ elif t == "mem":
+ self.mem_weight = float(w / denominator)
+ elif t == "lvol":
+ self.local_disk_weight = float(w / denominator)
+
+ def place(self, _app):
+ """Determine placements of new app creation."""
+
+ self._init_search(_app)
+
+ self.logger.info("search......")
+
+ open_node_list = self._open_list(self.app.servers, self.app.groups)
+
+ avail_resources = AvailResources(LEVEL[len(LEVEL) - 1])
+ avail_resources.avail_hosts = self.avail_hosts
+ avail_resources.set_next_level() # NOTE(Gueyoung): skip 'cluster' level
+
+ return self._run_greedy(open_node_list, avail_resources, "new")
+
+ # TODO: for update opt.
+ def re_place(self, _app_topology):
+ pass
+
+ def _re_place(self):
+ pass
+
+ def _open_list(self, _servers, _groups):
+ """Extract all servers and groups of each level (rack, host)."""
+
+ open_node_list = []
+
+ for _, s in _servers.iteritems():
+ self._set_node_weight(s)
+ open_node_list.append(s)
+
+ for _, g in _groups.iteritems():
+ self._set_node_weight(g)
+ open_node_list.append(g)
+
+ return open_node_list
+
+ def _set_node_weight(self, _v):
+ """Compute each server's or group's weight.
+
+ As larger weight, as more important one to be considered.
+ """
+
+ _v.sort_base = -1
+ _v.sort_base = self.CPU_weight * _v.vCPU_weight
+ _v.sort_base += self.mem_weight * _v.mem_weight
+ _v.sort_base += self.local_disk_weight * _v.local_volume_weight
+
+ # TODO: for update opt.
+ def _open_prior_list(self, _vms, _groups):
+ pass
+
+ def _adjust_resources(self):
+ pass
+
+ def _run_greedy(self, _open_node_list, _avail_resources, _mode):
+ """Search placements with greedy algorithm."""
+
+ _open_node_list.sort(key=operator.attrgetter("sort_base"), reverse=True)
+
+ for n in _open_node_list:
+ self.logger.debug("open node = " + n.vid + " cpus = " + str(n.vCPUs) + " sort = " + str(n.sort_base))
+
+ while len(_open_node_list) > 0:
+ n = _open_node_list.pop(0)
+
+ # TODO
+ # if _mode == "new":
+ best_resource = self._get_best_resource(n, _avail_resources, _mode)
+ # else:
+ # best_resource = self._get_best_resource_for_prior(n, _avail_resources, _mode)
+
+ if best_resource is None:
+ self.logger.error(self.app.status)
+ return False
+ else:
+ self._deduct_resources(_avail_resources.level, best_resource, n, _mode)
+ # TODO
+ # if _mode == "new":
+ self._close_node_placement(_avail_resources.level, best_resource, n)
+ # else:
+ # self._close_prior_placement(_avail_resources.level, best_resource, n)
+
+ return True
+
+ def _get_best_resource(self, _n, _avail_resources, _mode):
+ """Ddetermine the best placement for given server or affinity group."""
+
+ candidate_list = []
+ prior_resource = None
+
+ # If this is already placed one
+ if _n in self.prior_placements.keys():
+ prior_resource = _avail_resources.get_candidate(self.prior_placements[_n])
+ candidate_list.append(prior_resource)
+ else:
+ # TODO: need a list of candidates given as input?
+
+ _avail_resources.set_candidates()
+
+ candidate_list = self.constraint_solver.get_candidate_list(_n,
+ _avail_resources,
+ self.avail_hosts,
+ self.avail_groups)
+
+ if len(candidate_list) == 0:
+ if self.app.status == "ok":
+ if self.constraint_solver.status != "ok":
+ self.app.status = self.constraint_solver.status
+ else:
+ self.app.status = "fail while getting candidate hosts"
+ return None
+
+ if len(candidate_list) > 1:
+ self._set_compute_sort_base(_avail_resources.level, candidate_list)
+ candidate_list.sort(key=operator.attrgetter("sort_base"))
+
+ for c in candidate_list:
+ rn = c.get_resource_name(_avail_resources.level)
+ avail_cpus = c.get_vcpus(_avail_resources.level)
+ self.logger.debug("candidate = " + rn + " cpus = " + str(avail_cpus) + " sort = " + str(c.sort_base))
+
+ best_resource = None
+ if _avail_resources.level == "host" and isinstance(_n, Server):
+ best_resource = copy.deepcopy(candidate_list[0])
+ best_resource.level = "host"
+ else:
+ while len(candidate_list) > 0:
+ cr = candidate_list.pop(0)
+
+ (servers, groups) = get_next_placements(_n, _avail_resources.level)
+ open_node_list = self._open_list(servers, groups)
+
+ avail_resources = AvailResources(_avail_resources.level)
+ resource_name = cr.get_resource_name(_avail_resources.level)
+
+ avail_resources.set_next_avail_hosts(_avail_resources.avail_hosts, resource_name)
+ avail_resources.set_next_level()
+
+ # Recursive call
+ if self._run_greedy(open_node_list, avail_resources, _mode):
+ best_resource = copy.deepcopy(cr)
+ best_resource.level = _avail_resources.level
+ break
+ else:
+ if prior_resource is None:
+ self.logger.warning("rollback candidate = " + resource_name)
+
+ self._rollback_resources(_n)
+ self._rollback_node_placement(_n)
+
+ # TODO(Gueyoung): how to track the original error status?
+ if len(candidate_list) > 0 and self.app.status != "ok":
+ self.app.status = "ok"
+ else:
+ break
+
+ if best_resource is None and len(candidate_list) == 0:
+ if self.app.status == "ok":
+ self.app.status = "no available hosts"
+ self.logger.warning(self.app.status)
+
+ return best_resource
+
+ # TODO: for update opt.
+ def _get_best_resource_for_prior(self, _n, _avail_resources, _mode):
+ pass
+
+ def _set_compute_sort_base(self, _level, _candidate_list):
+ """Compute the weight of each candidate host."""
+
+ for c in _candidate_list:
+ cpu_ratio = -1
+ mem_ratio = -1
+ local_disk_ratio = -1
+
+ if _level == "rack":
+ cpu_ratio = float(c.rack_avail_vCPUs) / float(self.resource.CPU_avail)
+ mem_ratio = float(c.rack_avail_mem) / float(self.resource.mem_avail)
+ local_disk_ratio = float(c.rack_avail_local_disk) / float(self.resource.local_disk_avail)
+ elif _level == "host":
+ cpu_ratio = float(c.host_avail_vCPUs) / float(self.resource.CPU_avail)
+ mem_ratio = float(c.host_avail_mem) / float(self.resource.mem_avail)
+ local_disk_ratio = float(c.host_avail_local_disk) / float(self.resource.local_disk_avail)
+
+ c.sort_base = (1.0 - self.CPU_weight) * cpu_ratio + \
+ (1.0 - self.mem_weight) * mem_ratio + \
+ (1.0 - self.local_disk_weight) * local_disk_ratio
+
+ def _deduct_resources(self, _level, _best, _n, _mode):
+ """Apply new placement to hosting resources and groups."""
+
+ # Check if the chosen host is already applied.
+ if _mode == "new":
+ if _n in self.prior_placements.keys():
+ return
+ else:
+ if _n in self.node_placements.keys():
+ if _best.level == self.node_placements[_n].level:
+ return
+ else:
+ if _n in self.prior_placements.keys():
+ if _best.level == self.prior_placements[_n].level:
+ return
+
+ # Apply this placement to valet groups.
+
+ exclusivities = _n.get_exclusivities(_level)
+ if len(exclusivities) == 1:
+ exclusivity_group = exclusivities[exclusivities.keys()[0]]
+ self._add_exclusivity(_best, exclusivity_group)
+
+ if isinstance(_n, Group):
+ self._add_group(_level, _best, _n)
+
+ if len(_n.diversity_groups) > 0:
+ for _, div_group in _n.diversity_groups.iteritems():
+ self._add_group(_level, _best, div_group)
+
+ if len(_n.quorum_diversity_groups) > 0:
+ for _, div_group in _n.quorum_diversity_groups.iteritems():
+ self._add_group(_level, _best, div_group)
+
+ # Apply this placement to hosting resources.
+ if isinstance(_n, Server) and _level == "host":
+ self._deduct_server_resources(_best, _n)
+
+ def _add_exclusivity(self, _best, _group):
+ """Add new exclusivity group."""
+
+ if _group.vid not in self.avail_groups.keys():
+ gr = GroupResource()
+ gr.name = _group.vid
+ gr.group_type = "exclusivity"
+ gr.factory = "valet"
+ gr.level = _group.level
+ self.avail_groups[gr.name] = gr
+
+ self.logger.info("find exclusivity (" + _group.vid + ")")
+ else:
+ gr = self.avail_groups[_group.vid]
+
+ gr.num_of_placed_servers += 1
+
+ host_name = _best.get_resource_name(_group.level)
+ if host_name not in gr.num_of_placed_servers_of_host.keys():
+ gr.num_of_placed_servers_of_host[host_name] = 0
+ gr.num_of_placed_servers_of_host[host_name] += 1
+
+ chosen_host = self.avail_hosts[_best.host_name]
+ if _group.level == "host":
+ if _group.vid not in chosen_host.host_memberships.keys():
+ chosen_host.host_memberships[_group.vid] = gr
+ for _, np in self.avail_hosts.iteritems():
+ if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name:
+ if _group.vid not in np.rack_memberships.keys():
+ np.rack_memberships[_group.vid] = gr
+ else: # Rack level
+ for _, np in self.avail_hosts.iteritems():
+ if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name:
+ if _group.vid not in np.rack_memberships.keys():
+ np.rack_memberships[_group.vid] = gr
+
+ def _add_group(self, _level, _best, _group):
+ """Add new valet group."""
+
+ if _group.vid not in self.avail_groups.keys():
+ gr = GroupResource()
+ gr.name = _group.vid
+ gr.group_type = _group.group_type
+ gr.factory = _group.factory
+ gr.level = _group.level
+ self.avail_groups[gr.name] = gr
+
+ self.logger.info("find " + _group.group_type + " (" + _group.vid + ")")
+ else:
+ gr = self.avail_groups[_group.vid]
+
+ if _group.level == _level:
+ gr.num_of_placed_servers += 1
+
+ host_name = _best.get_resource_name(_level)
+ if host_name not in gr.num_of_placed_servers_of_host.keys():
+ gr.num_of_placed_servers_of_host[host_name] = 0
+ gr.num_of_placed_servers_of_host[host_name] += 1
+
+ chosen_host = self.avail_hosts[_best.host_name]
+ if _level == "host":
+ if _group.vid not in chosen_host.host_memberships.keys():
+ chosen_host.host_memberships[_group.vid] = gr
+ for _, np in self.avail_hosts.iteritems():
+ if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name:
+ if _group.vid not in np.rack_memberships.keys():
+ np.rack_memberships[_group.vid] = gr
+ else: # Rack level
+ for _, np in self.avail_hosts.iteritems():
+ if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name:
+ if _group.vid not in np.rack_memberships.keys():
+ np.rack_memberships[_group.vid] = gr
+
+ def _deduct_server_resources(self, _best, _n):
+ """Apply the reduced amount of resources to the chosen host.
+
+ _n is a server and _best is a compute host.
+ """
+
+ chosen_host = self.avail_hosts[_best.host_name]
+
+ chosen_host.host_avail_vCPUs -= _n.vCPUs
+ chosen_host.host_avail_mem -= _n.mem
+ chosen_host.host_avail_local_disk -= _n.local_volume_size
+
+ # Apply placement decision into NUMA
+ if _n.need_numa_alignment():
+ s_info = {}
+ s_info["stack_id"] = "none"
+ s_info["stack_name"] = self.app.app_name
+ s_info["uuid"] = "none"
+ s_info["name"] = _n.name
+ s_info["vcpus"] = _n.vCPUs
+ s_info["mem"] = _n.mem
+
+ chosen_host.NUMA.deduct_server_resources(s_info)
+
+ # TODO: need non_NUMA server?
+ # else:
+ # chosen_host.NUMA.apply_cpus_fairly(_n.vCPUs)
+ # chosen_host.NUMA.apply_mem_fairly(_n.mem)
+
+ if chosen_host.host_num_of_placed_servers == 0:
+ self.num_of_hosts += 1
+
+ chosen_host.host_num_of_placed_servers += 1
+
+ for _, np in self.avail_hosts.iteritems():
+ if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name:
+ np.rack_avail_vCPUs -= _n.vCPUs
+ np.rack_avail_mem -= _n.mem
+ np.rack_avail_local_disk -= _n.local_volume_size
+
+ np.rack_num_of_placed_servers += 1
+
+ def _close_node_placement(self, _level, _best, _v):
+ """Record the final placement decision."""
+
+ if _v not in self.node_placements.keys() and _v not in self.prior_placements.keys():
+ if _level == "host" or isinstance(_v, Group):
+ self.node_placements[_v] = _best
+
+ def _close_prior_placement(self, _level, _best, _v):
+ """Set the decision for placed server or group."""
+
+ if _v not in self.prior_placements.keys():
+ if _level == "host" or isinstance(_v, Group):
+ self.prior_placements[_v] = _best
+
+ def _rollback_resources(self, _v):
+ """Rollback the placement."""
+
+ if isinstance(_v, Server):
+ self._rollback_server_resources(_v)
+ elif isinstance(_v, Group):
+ for _, v in _v.subgroups.iteritems():
+ self._rollback_resources(v)
+
+ if _v in self.node_placements.keys():
+ chosen_host = self.avail_hosts[self.node_placements[_v].host_name]
+ level = self.node_placements[_v].level
+
+ if isinstance(_v, Group):
+ self._remove_group(chosen_host, _v, level)
+
+ exclusivities = _v.get_exclusivities(level)
+ if len(exclusivities) == 1:
+ ex_group = exclusivities[exclusivities.keys()[0]]
+ self._remove_exclusivity(chosen_host, ex_group)
+
+ if len(_v.diversity_groups) > 0:
+ for _, div_group in _v.diversity_groups.iteritems():
+ self._remove_group(chosen_host, div_group, level)
+
+ if len(_v.quorum_diversity_groups) > 0:
+ for _, div_group in _v.quorum_diversity_groups.iteritems():
+ self._remove_group(chosen_host, div_group, level)
+
+ def _remove_exclusivity(self, _chosen_host, _group):
+ """Remove the exclusivity group."""
+
+ gr = self.avail_groups[_group.vid]
+
+ host_name = _chosen_host.get_resource_name(_group.level)
+
+ gr.num_of_placed_servers -= 1
+ gr.num_of_placed_servers_of_host[host_name] -= 1
+
+ if gr.num_of_placed_servers_of_host[host_name] == 0:
+ del gr.num_of_placed_servers_of_host[host_name]
+
+ if gr.num_of_placed_servers == 0:
+ del self.avail_groups[_group.vid]
+
+ if _group.level == "host":
+ if _chosen_host.host_num_of_placed_servers == 0 and \
+ _group.vid in _chosen_host.host_memberships.keys():
+ del _chosen_host.host_memberships[_group.vid]
+
+ for _, np in self.avail_hosts.iteritems():
+ if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name:
+ if _group.vid in np.rack_memberships.keys():
+ del np.rack_memberships[_group.vid]
+ else: # Rack level
+ if _chosen_host.rack_num_of_placed_servers == 0:
+ for _, np in self.avail_hosts.iteritems():
+ if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name:
+ if _group.vid in np.rack_memberships.keys():
+ del np.rack_memberships[_group.vid]
+
+ def _remove_group(self, _chosen_host, _group, _level):
+ """Remove valet group."""
+
+ if _group.level == _level:
+ gr = self.avail_groups[_group.vid]
+
+ host_name = _chosen_host.get_resource_name(_level)
+
+ gr.num_of_placed_servers -= 1
+ gr.num_of_placed_servers_of_host[host_name] -= 1
+
+ if gr.num_of_placed_servers_of_host[host_name] == 0:
+ del gr.num_of_placed_servers_of_host[host_name]
+
+ if gr.num_of_placed_servers == 0:
+ del self.avail_groups[_group.vid]
+
+ exist_group = True
+ if _group.vid not in self.avail_groups.keys():
+ exist_group = False
+ else:
+ if host_name not in gr.num_of_placed_servers_of_host.keys():
+ exist_group = False
+
+ if _level == "host":
+ if not exist_group and _group.vid in _chosen_host.host_memberships.keys():
+ del _chosen_host.host_memberships[_group.vid]
+
+ for _, np in self.avail_hosts.iteritems():
+ if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name:
+ if _group.vid in np.rack_memberships.keys():
+ del np.rack_memberships[_group.vid]
+ else: # Rack level
+ if not exist_group:
+ for _, np in self.avail_hosts.iteritems():
+ if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name:
+ if _group.vid in np.rack_memberships.keys():
+ del np.rack_memberships[_group.vid]
+
+ def _rollback_server_resources(self, _v):
+ """Return back the amount of resources to host."""
+
+ if _v in self.node_placements.keys():
+ chosen_host = self.avail_hosts[self.node_placements[_v].host_name]
+
+ chosen_host.host_avail_vCPUs += _v.vCPUs
+ chosen_host.host_avail_mem += _v.mem
+ chosen_host.host_avail_local_disk += _v.local_volume_size
+
+ # Apply rollback into NUMA
+ if _v.need_numa_alignment():
+ s_info = {}
+ s_info["stack_id"] = "none"
+ s_info["stack_name"] = self.app.app_name
+ s_info["uuid"] = "none"
+ s_info["name"] = _v.name
+ s_info["vcpus"] = _v.vCPUs
+ s_info["mem"] = _v.mem
+
+ chosen_host.NUMA.rollback_server_resources(s_info)
+
+ chosen_host.host_num_of_placed_servers -= 1
+
+ if chosen_host.host_num_of_placed_servers == 0:
+ self.num_of_hosts -= 1
+
+ for _, np in self.avail_hosts.iteritems():
+ if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name:
+ np.rack_avail_vCPUs += _v.vCPUs
+ np.rack_avail_mem += _v.mem
+ np.rack_avail_local_disk += _v.local_volume_size
+
+ np.rack_num_of_placed_servers -= 1
+
+ # If the chosen host was a new host and its host type was unknown,
+ # rollback to the original unknown state.
+ if chosen_host.host_num_of_placed_servers == 0:
+ if chosen_host.old_candidate_host_types is not None and len(chosen_host.old_candidate_host_types) > 0:
+ flavor_type_list = _v.get_flavor_types()
+ ha = self.avail_groups[flavor_type_list[0]]
+
+ chosen_host.rollback_avail_resources(ha)
+ chosen_host.candidate_host_types = copy.deepcopy(chosen_host.old_candidate_host_types)
+ chosen_host.old_candidate_host_types.clear()
+
+ for hrk, hr in self.avail_hosts.iteritems():
+ if hrk != chosen_host.host_name:
+ if hr.rack_name == chosen_host.rack_name:
+ hr.rollback_avail_rack_resources(ha,
+ chosen_host.rack_avail_vCPUs,
+ chosen_host.rack_avail_mem,
+ chosen_host.rack_avail_local_disk)
+
+ def _rollback_node_placement(self, _v):
+ """Remove placement decisions."""
+
+ if _v in self.node_placements.keys():
+ del self.node_placements[_v]
+
+ if isinstance(_v, Group):
+ for _, sg in _v.subgroups.iteritems():
+ self._rollback_node_placement(sg)
diff --git a/engine/src/valet/engine/search/search_helper.py b/engine/src/valet/engine/search/search_helper.py
new file mode 100644
index 0000000..0e64ef7
--- /dev/null
+++ b/engine/src/valet/engine/search/search_helper.py
@@ -0,0 +1,43 @@
+#
+# -------------------------------------------------------------------------
+# Copyright (c) 2019 AT&T Intellectual Property
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+#!/bin/python
+
+
+from valet.engine.app_manager.group import Group, LEVEL
+
+
+def get_next_placements(_n, _level):
+ """Get servers and groups to be handled in the next level of search."""
+
+ servers = {}
+ groups = {}
+
+ if isinstance(_n, Group):
+ if LEVEL.index(_n.level) < LEVEL.index(_level):
+ groups[_n.vid] = _n
+ else:
+ for _, sg in _n.subgroups.iteritems():
+ if isinstance(sg, Group):
+ groups[sg.vid] = sg
+ else:
+ servers[sg.vid] = sg
+ else:
+ servers[_n.vid] = _n
+
+ return servers, groups