diff options
Diffstat (limited to 'engine')
43 files changed, 9283 insertions, 0 deletions
diff --git a/engine/src/valet/engine/app_manager/__init__.py b/engine/src/valet/engine/app_manager/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/app_manager/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/app_manager/app.py b/engine/src/valet/engine/app_manager/app.py new file mode 100644 index 0000000..de6fb8f --- /dev/null +++ b/engine/src/valet/engine/app_manager/app.py @@ -0,0 +1,716 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import copy +import json +import re +import six + +from valet.engine.app_manager.app_parser import Parser +from valet.engine.app_manager.group import Group +from valet.engine.app_manager.server import Server + + +class App(object): + """Container to deliver the status of request.""" + + def __init__(self, _req_id, _use_dha, _logger): + + self.last_req_id = _req_id + + self.datacenter_id = None + + # stack_id given from the platform (e.g., OpenStack) + self.app_id = None + + # stack_name as datacenter_id + ":" + tenant_id + ":" + vf_module_name + self.app_name = None + + self.tenant_id = None + + # Info for group rule scope + self.service_instance_id = None + self.vnf_instance_id = None + self.vnf_instance_name = None + + # Stack resources. key = server's orch_id + self.stack = {} + + self.resource = None + + self.logger = _logger + + self.parser = Parser(self.logger) + + self.groups = {} # all valet groups used in this app + self.servers = {} # requested servers (e.g., VMs) + + # Store prior server info. + self.prior_servers = {} + + self.status = "ok" + + self.state = "plan" + + self.prior_state = "unknown" + + # Check if do not want rollback for create + self.suppress_rollback = False + + # For placement optimization + self.total_CPU = 0 + self.total_mem = 0 + self.total_local_vol = 0 + self.optimization_priority = None + + self.use_dha = _use_dha + + def set_resource(self, _resource): + self.resource = _resource + + def init_for_create(self, _req): + """Validate and init app info based on the given request.""" + + self.state = "create" + + if "datacenter" in _req.keys(): + if "id" in _req["datacenter"].keys(): + if _req["datacenter"]["id"] is not None: + self.datacenter_id = _req["datacenter"]["id"].strip() + else: + self.status = "datacenter_id is None" + return + else: + self.status = "no datacenter_id in request" + return + + if "url" in _req["datacenter"].keys(): + if _req["datacenter"]["url"] is not None: + if not self._match_url(_req["datacenter"]["url"].strip()): + self.status = "mal-formed url" + return + else: + self.status = "url is None" + return + else: + self.status = "no url in request" + return + else: + self.status = "no datacenter info in request" + return + + if "tenant_id" in _req.keys(): + if _req["tenant_id"] is not None: + self.tenant_id = _req["tenant_id"] + else: + self.status = "tenant_id is None" + return + else: + self.status = "no tenant_id in request" + return + + if "service_instance_id" in _req.keys(): + if _req["service_instance_id"] is not None: + self.service_instance_id = _req["service_instance_id"] + else: + self.status = "service_id is None" + return + else: + self.status = "no service_instance_id in request" + return + + if "vnf_instance_id" in _req.keys(): + if _req["vnf_instance_id"] is not None: + self.vnf_instance_id = _req["vnf_instance_id"] + else: + self.status = "vnf_id is None" + return + else: + self.status = "no vnf_instance_id in request" + return + + if "vnf_instance_name" in _req.keys(): + if _req["vnf_instance_name"] is not None: + self.vnf_instance_name = _req["vnf_instance_name"] + else: + self.status = "vnf_name is None" + return + else: + self.status = "no vnf_instance_name in request" + return + + if "stack_name" in _req.keys(): + if _req["stack_name"] is not None: + self.app_name = self.datacenter_id + ":" + self.tenant_id + ":" + _req["stack_name"].strip() + else: + self.status = "stack_name is None" + return + else: + self.status = "no stack_name in request" + return + + if "stack" in _req.keys(): + self.stack = self._validate_stack(_req["stack"]) + else: + self.status = "no stack in request" + return + + def init_for_delete(self, _req): + """Validate and init app info for marking delete.""" + + self.state = "delete" + + if "datacenter" in _req.keys(): + if "id" in _req["datacenter"].keys(): + if _req["datacenter"]["id"] is not None: + self.datacenter_id = _req["datacenter"]["id"].strip() + else: + self.status = "datacenter_id is None" + return + else: + self.status = "no datacenter_id in request" + return + else: + self.status = "no datacenter info in request" + return + + if "tenant_id" in _req.keys(): + if _req["tenant_id"] is not None: + self.tenant_id = _req["tenant_id"] + else: + self.status = "tenant_id is None" + return + else: + self.status = "no tenant_id in request" + return + + if "stack_name" in _req.keys(): + if _req["stack_name"] is not None: + self.app_name = self.datacenter_id + ":" + self.tenant_id + ":" + _req["stack_name"].strip() + else: + self.status = "stack_name is None" + return + else: + self.status = "no stack_name in request" + return + + def init_for_confirm(self, _req): + """Validate and init app info for confirm.""" + + # Tempoary state and will depends on prior state + self.state = "confirm" + + if "stack_id" in _req.keys(): + if _req["stack_id"] is not None: + stack_id = _req["stack_id"].strip() + + stack_id_elements = stack_id.split('/', 1) + if len(stack_id_elements) > 1: + self.app_id = stack_id_elements[1] + else: + self.app_id = stack_id + + self.logger.debug("stack_id = " + self.app_id) + + else: + self.status = "null stack_id in request" + return + else: + self.status = "no stack_id in request" + return + + def init_for_rollback(self, _req): + """Validate and init app info for rollback.""" + + # Tempoary state and will depends on prior state + self.state = "rollback" + + if "stack_id" in _req.keys(): + if _req["stack_id"] is not None: + stack_id = _req["stack_id"].strip() + + stack_id_elements = stack_id.split('/', 1) + if len(stack_id_elements) > 1: + self.app_id = stack_id_elements[1] + else: + self.app_id = stack_id + + self.logger.debug("stack_id = " + self.app_id) + else: + # If the stack fails, stack_id can be null. + self.app_id = "none" + + self.logger.debug("stack_id = None") + else: + self.status = "no stack_id in request" + return + + if "suppress_rollback" in _req.keys(): + self.suppress_rollback = _req["suppress_rollback"] + + if "error_message" in _req.keys(): + # TODO(Gueyoung): analyze the error message. + + if _req["error_message"] is None: + self.logger.warning("error message from platform: none") + else: + self.logger.warning("error message from platform:" + _req["error_message"]) + + def init_prior_app(self, _prior_app): + """Init with the prior app info.""" + + self.datacenter_id = _prior_app.get("datacenter") + + self.app_name = _prior_app.get("stack_name") + + if _prior_app["uuid"] != "none": + # Delete case. + self.app_id = _prior_app.get("uuid") + + self.tenant_id = _prior_app.get("tenant_id") + + metadata = json.loads(_prior_app.get("metadata")) + self.service_instance_id = metadata.get("service_instance_id") + self.vnf_instance_id = metadata.get("vnf_instance_id") + self.vnf_instance_name = metadata.get("vnf_instance_name") + + self.servers = json.loads(_prior_app.get("servers")) + + prior_state = _prior_app.get("state") + + if self.state == "confirm": + if prior_state == "create": + self.state = "created" + elif prior_state == "delete": + self.state = "deleted" + elif self.state == "rollback": + if prior_state == "create": + if self.suppress_rollback and self.app_id != "none": + self.state = "created" + else: + self.state = "deleted" + + if prior_state == "delete": + self.state = "created" + elif self.state == "delete": + self.prior_state = prior_state + self.prior_servers = copy.deepcopy(self.servers) + else: + self.status = "unknown state" + + def _validate_stack(self, _stack): + """Check if the stack is for Valet to make decision. + + And, check if the format is correct. + """ + + if len(_stack) == 0 or "resources" not in _stack.keys(): + self.status = "na: no resource in stack" + self.logger.warning("non-applicable to valet: no resource in stack") + return {} + + stack = {} + + for rk, r in _stack["resources"].iteritems(): + if "type" not in r.keys(): + self.status = "type key is missing in stack" + return None + + if r["type"] == "OS::Nova::Server": + if "properties" not in r.keys(): + self.status = "properties key is missing in stack" + return None + + if "name" not in r["properties"].keys(): + self.status = "name property is missing in stack" + return None + + if r["properties"]["name"] is None: + self.status = "name property is none" + return None + + if "flavor" not in r["properties"].keys(): + self.status = "flavor property is missing in stack" + return None + + if r["properties"]["flavor"] is None: + self.status = "flavor property is none" + return None + + stack[rk] = r + + if len(stack) == 0: + self.status = "na: no server resource in stack" + self.logger.warning("non-applicable to valet: no server resource in stack") + return {} + + first_resource = stack[stack.keys()[0]] + apply_valet = False + + # To apply Valet decision, availability_zone must exist. + # And its value contains host variable as a list element. + if "availability_zone" in first_resource["properties"].keys(): + az_value = first_resource["properties"]["availability_zone"] + if isinstance(az_value, list): + apply_valet = True + + for rk, r in stack.iteritems(): + if apply_valet: + if "availability_zone" not in r["properties"].keys(): + self.status = "az is missing in stack for valet" + return None + else: + az_value = r["properties"]["availability_zone"] + if not isinstance(az_value, list): + self.status = "host variable is missing in stack for valet" + return None + + if az_value[0] in ("none", "None") or az_value[1] in ("none", "None"): + self.status = "az value is missing in stack" + return None + else: + if "availability_zone" in r["properties"].keys(): + az_value = r["properties"]["availability_zone"] + if isinstance(az_value, list): + self.status = "host variable exists in stack for non-valet application" + return None + + if not apply_valet: + self.status = "na: pass valet" + self.logger.warning("non-applicable to valet") + return {} + else: + return stack + + def init_valet_groups(self): + """Create Valet groups from input request.""" + + for rk, r in self.stack.iteritems(): + properties = r.get("properties", {}) + metadata = properties.get("metadata", {}) + + if len(metadata) > 0: + valet_rules = metadata.get("valet_groups", None) + + if valet_rules is not None and valet_rules != "": + rule_list = [] + if isinstance(valet_rules, six.string_types): + rules = valet_rules.split(",") + for gr in rules: + rule_list.append(gr.strip()) + else: + self.status = "incorrect valet group metadata format" + self.logger.error(self.status) + return + + # Check rule validation of valet_groups. + self.status = self.resource.check_valid_rules(self.tenant_id, + rule_list, + use_ex=self.use_dha) + if self.status != "ok": + self.logger.error(self.status) + return + + self.status = self._make_valet_groups(properties.get("name"), + properties["availability_zone"][0], + rule_list) + if self.status != "ok": + self.logger.error(self.status) + return + + # Check and create server groups if they do not exist. + scheduler_hints = properties.get("scheduler_hints", {}) + if len(scheduler_hints) > 0: + for hint_key in scheduler_hints.keys(): + if hint_key == "group": + hint = scheduler_hints[hint_key] + self.status = self._make_group(properties.get("name"), hint) + if self.status != "ok": + self.logger.error(self.status) + return + + def _make_valet_groups(self, _rk, _az, _rule_list): + """Create Valet groups that each server is involved.""" + + for rn in _rule_list: + rule = self.resource.group_rules[rn] + + # Valet group naming convention. + # It contains datacenter id and availability_zone + # followed by service id and vnf id + # depending on scope. + # And concatenate rule name. + # Exception: quorum-diversity + + group_id = self.datacenter_id + ":" + + if rule.rule_type != "quorum-diversity": + group_id += _az + ":" + + if rule.app_scope == "lcp": + group_id += rn + elif rule.app_scope == "service": + group_id += self.service_instance_id + ":" + rn + elif rule.app_scope == "vnf": + group_id += self.service_instance_id + ":" + self.vnf_instance_id + ":" + rn + else: + return "unknown app_scope value" + + if group_id in self.groups.keys(): + group = self.groups[group_id] + else: + group = Group(group_id) + group.group_type = rule.rule_type + group.factory = "valet" + group.level = rule.level + + self.groups[group_id] = group + + group.server_list.append(self.app_name + ":" + _rk) + + return "ok" + + def _make_group(self, _rk, _group_hint): + """Create the group request based on scheduler hint.""" + + if isinstance(_group_hint, dict): + # _group_hint is a single key/value pair + g = _group_hint[_group_hint.keys()[0]] + + r_type = g.get("type", "none") + if r_type != "OS::Nova::ServerGroup": + return "support only ServerGroup resource" + + properties = g.get("properties", {}) + if len(properties) == 0: + return "no properties" + + group_name = properties.get("name", None) + if group_name is None: + return "no group name" + group_name = group_name.strip() + + policies = properties.get("policies", []) + if len(policies) == 0: + return "no policy of the group" + + if len(policies) > 1: + return "multiple policies" + + # TODO: exclude soft-affinity and soft-anti-affinity? + + if group_name in self.groups.keys(): + group = self.groups[group_name] + else: + group = Group(group_name) + + policy = policies[0].strip() + if policy == "anti-affinity": + group_type = "diversity" + else: + group_type = policy + + group.group_type = group_type + group.factory = "server-group" + group.level = "host" + + self.groups[group_name] = group + else: + # group hint is uuid string. + rg = self.resource.get_group_by_uuid(_group_hint) + if rg is None: + return "unknown group found while making group" + + # TODO: exclude soft-affinity and soft-anti-affinity? + + if rg.name in self.groups.keys(): + group = self.groups[rg.name] + else: + group = Group(rg.name) + + group.group_type = rg.group_type + group.factory = rg.factory + group.level = "host" + + self.groups[rg.name] = group + + if group is not None: + group.server_list.append(self.app_name + ":" + _rk) + + return "ok" + + def parse(self): + """Extract servers and merge groups from stack for search.""" + + (self.servers, self.groups) = self.parser.set_servers(self.app_name, + self.stack, + self.groups) + + if len(self.servers) == 0 and len(self.groups) == 0: + self.status = "parse error for " + self.app_name + ": " + self.parser.status + return False + + return True + + def set_weight(self): + """Set relative weight of each servers and groups.""" + + for _, s in self.servers.iteritems(): + self._set_server_weight(s) + + for _, g in self.groups.iteritems(): + self._set_server_weight(g) + + for _, g in self.groups.iteritems(): + self._set_group_resource(g) + + for _, g in self.groups.iteritems(): + self._set_group_weight(g) + + def _set_server_weight(self, _v): + """Set relative weight of each server against available resource amount.""" + + if isinstance(_v, Group): + for _, sg in _v.subgroups.iteritems(): + self._set_server_weight(sg) + else: + if self.resource.CPU_avail > 0: + _v.vCPU_weight = float(_v.vCPUs) / float(self.resource.CPU_avail) + else: + _v.vCPU_weight = 1.0 + self.total_CPU += _v.vCPUs + + if self.resource.mem_avail > 0: + _v.mem_weight = float(_v.mem) / float(self.resource.mem_avail) + else: + _v.mem_weight = 1.0 + self.total_mem += _v.mem + + if self.resource.local_disk_avail > 0: + _v.local_volume_weight = float(_v.local_volume_size) / float(self.resource.local_disk_avail) + else: + if _v.local_volume_size > 0: + _v.local_volume_weight = 1.0 + else: + _v.local_volume_weight = 0.0 + self.total_local_vol += _v.local_volume_size + + def _set_group_resource(self, _g): + """Sum up amount of resources of servers for each affinity group.""" + + if isinstance(_g, Server): + return + + for _, sg in _g.subgroups.iteritems(): + self._set_group_resource(sg) + _g.vCPUs += sg.vCPUs + _g.mem += sg.mem + _g.local_volume_size += sg.local_volume_size + + def _set_group_weight(self, _group): + """Set relative weight of each affinity group against available resource amount.""" + + if self.resource.CPU_avail > 0: + _group.vCPU_weight = float(_group.vCPUs) / float(self.resource.CPU_avail) + else: + if _group.vCPUs > 0: + _group.vCPU_weight = 1.0 + else: + _group.vCPU_weight = 0.0 + + if self.resource.mem_avail > 0: + _group.mem_weight = float(_group.mem) / float(self.resource.mem_avail) + else: + if _group.mem > 0: + _group.mem_weight = 1.0 + else: + _group.mem_weight = 0.0 + + if self.resource.local_disk_avail > 0: + _group.local_volume_weight = float(_group.local_volume_size) / float(self.resource.local_disk_avail) + else: + if _group.local_volume_size > 0: + _group.local_volume_weight = 1.0 + else: + _group.local_volume_weight = 0.0 + + for _, sg in _group.subgroups.iteritems(): + if isinstance(sg, Group): + self._set_group_weight(sg) + + def set_optimization_priority(self): + """Determine the optimization priority among different types of resources.""" + + if len(self.groups) == 0 and len(self.servers) == 0: + return + + if self.resource.CPU_avail > 0: + app_cpu_weight = float(self.total_CPU) / float(self.resource.CPU_avail) + else: + if self.total_CPU > 0: + app_cpu_weight = 1.0 + else: + app_cpu_weight = 0.0 + + if self.resource.mem_avail > 0: + app_mem_weight = float(self.total_mem) / float(self.resource.mem_avail) + else: + if self.total_mem > 0: + app_mem_weight = 1.0 + else: + app_mem_weight = 0.0 + + if self.resource.local_disk_avail > 0: + app_local_vol_weight = float(self.total_local_vol) / float(self.resource.local_disk_avail) + else: + if self.total_local_vol > 0: + app_local_vol_weight = 1.0 + else: + app_local_vol_weight = 0.0 + + opt = [("cpu", app_cpu_weight), + ("mem", app_mem_weight), + ("lvol", app_local_vol_weight)] + + self.optimization_priority = sorted(opt, key=lambda resource: resource[1], reverse=True) + + def reset_servers(self): + """Get servers back from containers (i.e., affinity groups)""" + + servers = [] + for _, g in self.groups.iteritems(): + g.get_servers(servers) + + for s in servers: + self.servers[s.vid] = s + + def _match_url(self, _url): + """Check if the URL is a correct form.""" + + regex = re.compile( + r'^(?:http|ftp)s?://' # http:// or https:// + r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain + r'localhost|' # localhost + r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip + r'(?::\d+)?' # optional port + r'(?:/?|[/?]\S+)$', re.IGNORECASE) + + if re.match(regex, _url): + return True + else: + return False diff --git a/engine/src/valet/engine/app_manager/app_handler.py b/engine/src/valet/engine/app_manager/app_handler.py new file mode 100644 index 0000000..14ef35c --- /dev/null +++ b/engine/src/valet/engine/app_manager/app_handler.py @@ -0,0 +1,307 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import operator +import time + +from valet.engine.app_manager.app import App + + +class AppHistory(object): + """Data container for scheduling decisions.""" + + def __init__(self, _key): + self.decision_key = _key + self.status = None + self.result = None + self.timestamp = None + + +class AppHandler(object): + """Handler for all requested applications.""" + + def __init__(self, _dbh, _use_dha, _logger): + self.dbh = _dbh + + self.decision_history = {} + self.max_decision_history = 5000 + self.min_decision_history = 1000 + + self.use_dha = _use_dha + + self.logger = _logger + + def validate_for_create(self, _req_id, _req): + """Validate create request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + app.init_for_create(_req) + + if app.status != "ok" and not app.status.startswith("na:"): + self.logger.error(app.status) + else: + self.logger.info("got 'create' for app = " + app.app_name) + + return app + + def validate_for_update(self, _req_id, _req): + """Validate update request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + + # Use create validation module. + app.init_for_create(_req) + app.state = "update" + + if app.status != "ok" and not app.status.startswith("na:"): + self.logger.error(app.status) + else: + self.logger.info("got 'update' for app = " + app.app_name) + + return app + + def validate_for_delete(self, _req_id, _req): + """Validate delete request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + app.init_for_delete(_req) + + if app.status != "ok": + self.logger.error(app.status) + return app + + prior_app = self.dbh.get_stack(app.app_name) + if prior_app is None: + return None + + if len(prior_app) == 0: + app.status = "na: no prior request via valet" + return app + + # Once taking prior app, valet deterimnes placements or error. + app.init_prior_app(prior_app) + + self.logger.info("got 'delete' for app = " + app.app_name) + + return app + + def validate_for_confirm(self, _req_id, _req): + """Validate confirm request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + app.init_for_confirm(_req) + + if app.status != "ok": + self.logger.error(app.status) + return app + + stack_id_map = self.dbh.get_stack_id_map(app.last_req_id) + if stack_id_map is None: + return None + + if len(stack_id_map) == 0: + app.status = "na: not handled request via valet" + return app + + prior_app = self.dbh.get_stack(stack_id_map.get("stack_id")) + if prior_app is None: + return None + + if len(prior_app) == 0: + app.status = "cannot find prior stack info" + return app + + # Once taking prior app, valet deterimnes placements or error. + app.init_prior_app(prior_app) + + self.logger.info("got 'confirm' for app = " + app.app_name) + + return app + + def validate_for_rollback(self, _req_id, _req): + """Validate rollback request and return app.""" + + app = App(_req_id, self.use_dha, self.logger) + app.init_for_rollback(_req) + + if app.status != "ok": + self.logger.error(app.status) + return app + + stack_id_map = self.dbh.get_stack_id_map(app.last_req_id) + if stack_id_map is None: + return None + + if len(stack_id_map) == 0: + app.status = "na: not handled request via valet" + return app + + prior_app = self.dbh.get_stack(stack_id_map.get("stack_id")) + if prior_app is None: + return None + + if len(prior_app) == 0: + app.status = "cannot find prior stack info" + return app + + # Once taking prior app, valet deterimnes placements or error. + app.init_prior_app(prior_app) + + self.logger.info("got 'rollback' for app = " + app.app_name) + + return app + + def set_for_create(self, _app): + """Set for stack-creation request.""" + + # Set Valet groups. + _app.init_valet_groups() + if _app.status != "ok": + return + + # Set flavor properties for each server. + for rk, r in _app.stack.iteritems(): + if "vcpus" not in r["properties"].keys(): + flavor = _app.resource.get_flavor(r["properties"]["flavor"]) + + if flavor is None: + _app.status = "fail to get flavor details" + self.logger.error(_app.status) + return + + if flavor.status != "enabled": + # TODO(Gueyoung): what to do if flavor is disabled? + self.logger.warning("disabled flavor = " + flavor.name) + + r["properties"]["vcpus"] = flavor.vCPUs + r["properties"]["mem"] = flavor.mem_cap + r["properties"]["local_volume"] = flavor.disk_cap + + if len(flavor.extra_specs) > 0: + extra_specs = {} + for mk, mv in flavor.extra_specs.iteritems(): + extra_specs[mk] = mv + r["properties"]["extra_specs"] = extra_specs + + # Set servers. + # Once parsing app, valet deterimnes placements or error. + if not _app.parse(): + self.logger.error(_app.status) + return + + return + + def set_for_update(self, _app): + """Set for stack-update request.""" + + # Set servers. + # Once parsing app, valet deterimnes placements or error. + if not _app.parse(): + self.logger.error(_app.status) + return + + # Skip stack-update and rely on platform at this version. + _app.status = "na:update: pass stack-update" + + return + + def check_history(self, _req_id): + """Check if the request is determined already.""" + + if _req_id in self.decision_history.keys(): + status = self.decision_history[_req_id].status + result = self.decision_history[_req_id].result + return status, result + else: + return None, None + + def record_history(self, _req_id, _status, _result): + """Record an app placement decision.""" + + if _req_id not in self.decision_history.keys(): + if len(self.decision_history) > self.max_decision_history: + self._flush_decision_history() + + app_history = AppHistory(_req_id) + app_history.status = _status + app_history.result = _result + app_history.timestamp = time.time() + + self.decision_history[_req_id] = app_history + + def _flush_decision_history(self): + """Unload app placement decisions.""" + + count = 0 + num_of_removes = len(self.decision_history) - self.min_decision_history + + remove_item_list = [] + for decision in (sorted(self.decision_history.values(), key=operator.attrgetter('timestamp'))): # type: AppHistory + remove_item_list.append(decision.decision_key) + count += 1 + if count == num_of_removes: + break + + for dk in remove_item_list: + del self.decision_history[dk] + + def store_app(self, _app): + """Store new app or update existing app.""" + + if _app.state == "create": + metadata = {"service_instance_id": _app.service_instance_id, "vnf_instance_id": _app.vnf_instance_id, + "vnf_instance_name": _app.vnf_instance_name} + + servers = {} + for sk, s in _app.servers.iteritems(): + servers[sk] = s.get_json_info() + + if not self.dbh.create_stack(_app.app_name, + _app.status, + _app.datacenter_id, _app.app_name, "none", + _app.tenant_id, metadata, + servers, {}, + _app.state, "none"): + return False + elif _app.state in ("delete", "created"): + metadata = {"service_instance_id": _app.service_instance_id, "vnf_instance_id": _app.vnf_instance_id, + "vnf_instance_name": _app.vnf_instance_name} + + if not self.dbh.update_stack(_app.app_name, + _app.status, + _app.datacenter_id, _app.app_name, _app.app_id, + _app.tenant_id, metadata, + _app.servers, _app.prior_servers, + _app.state, _app.prior_state): + return False + elif _app.state == "deleted": + if not self.dbh.delete_stack(_app.app_name): + return False + else: + self.logger.error("unknown operaton") + return False + + # To manage the map between request_id and Heat stack requested + if _app.state in ("create", "delete"): + if not self.dbh.create_stack_id_map(_app.last_req_id, _app.app_name): + return False + elif _app.state in ("created", "deleted"): + if not self.dbh.delete_stack_id_map(_app.last_req_id): + return False + + return True diff --git a/engine/src/valet/engine/app_manager/app_parser.py b/engine/src/valet/engine/app_manager/app_parser.py new file mode 100644 index 0000000..9e6336b --- /dev/null +++ b/engine/src/valet/engine/app_manager/app_parser.py @@ -0,0 +1,257 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import six + +from valet.engine.app_manager.group import Group, LEVEL +from valet.engine.app_manager.server import Server + + +class Parser(object): + + def __init__(self, _logger): + self.logger = _logger + + self.status = "ok" + + def set_servers(self, _app_name, _stack, _groups): + """Parse stack resources to set servers (e.g., VMs) for search.""" + + servers = {} + + for rk, r in _stack.iteritems(): + properties = r.get("properties") + + server_name = properties["name"].strip() + server_id = _app_name + ":" + server_name + + server = Server(server_id, rk) + + server.name = server_name + + flavor_id = properties.get("flavor") + if isinstance(flavor_id, six.string_types): + server.flavor = flavor_id.strip() + else: + server.flavor = str(flavor_id) + + image_id = properties.get("image", None) + if image_id is not None: + if isinstance(image_id, six.string_types): + server.image = image_id.strip() + else: + server.image = str(image_id) + + if "vcpus" in properties.keys(): + server.vCPUs = int(properties.get("vcpus")) + server.mem = int(properties.get("mem")) + server.local_volume_size = int(properties.get("local_volume")) + + ess = properties.get("extra_specs", {}) + if len(ess) > 0: + extra_specs = {} + for mk, mv in ess.iteritems(): + extra_specs[mk] = mv + server.extra_specs_list.append(extra_specs) + + az = properties.get("availability_zone", None) + if az is not None: + server.availability_zone = az[0].strip() + server.host_assignment_variable = az[1].strip() + if len(az) == 3: + server.host_assignment_inx = az[2] + + servers[server_id] = server + + if self._merge_diversity_groups(_groups, servers) is False: + self.status = "fail while merging diversity groups" + return {}, {} + if self._merge_quorum_diversity_groups(_groups, servers) is False: + self.status = "fail while merging quorum-diversity groups" + return {}, {} + if self._merge_exclusivity_groups(_groups, servers) is False: + self.status = "fail while merging exclusivity groups" + return {}, {} + if self._merge_affinity_groups(_groups, servers) is False: + self.status = "fail while merging affinity groups" + return {}, {} + + # To delete all exclusivity and diversity groups after merging + groups = {gk: g for gk, g in _groups.iteritems() if g.group_type == "affinity"} + + return servers, groups + + def _merge_diversity_groups(self, _groups, _servers): + """Merge diversity sub groups.""" + + for level in LEVEL: + for gk, group in _groups.iteritems(): + if group.level == level and group.group_type == "diversity": + for sk in group.server_list: + if sk in _servers.keys(): + group.subgroups[sk] = _servers[sk] + _servers[sk].diversity_groups[group.vid] = group + else: + self.status = "invalid server = " + sk + " in group = " + group.vid + return False + + return True + + def _merge_quorum_diversity_groups(self, _groups, _servers): + """Merge quorum-diversity sub groups.""" + + for level in LEVEL: + for gk, group in _groups.iteritems(): + if group.level == level and group.group_type == "quorum-diversity": + for sk in group.server_list: + if sk in _servers.keys(): + group.subgroups[sk] = _servers[sk] + _servers[sk].quorum_diversity_groups[group.vid] = group + else: + self.status = "invalid server = " + sk + " in group = " + group.vid + return False + + return True + + def _merge_exclusivity_groups(self, _groups, _servers): + """Merge exclusivity sub groups.""" + + for level in LEVEL: + for gk, group in _groups.iteritems(): + if group.level == level and group.group_type == "exclusivity": + for sk in group.server_list: + if sk in _servers.keys(): + group.subgroups[sk] = _servers[sk] + _servers[sk].exclusivity_groups[group.vid] = group + else: + self.status = "invalid server = " + sk + " in group = " + group.vid + return False + + return True + + def _merge_affinity_groups(self, _groups, _servers): + """Merge affinity subgroups.""" + + # To track each server's or group's parent group (i.e., affinity) + affinity_map = {} + + # To make cannonical order of groups + group_list = [gk for gk in _groups.keys()] + group_list.sort() + + for level in LEVEL: + for gk in group_list: + if gk in _groups.keys(): + if _groups[gk].level == level and _groups[gk].group_type == "affinity": + group = _groups[gk] + else: + continue + else: + continue + + group.server_list.sort() + + for sk in group.server_list: + if sk in _servers.keys(): + self._merge_server(group, sk, _servers, affinity_map) + else: + if sk not in affinity_map.keys(): + self.status = "invalid server = " + sk + " in group = " + group.vid + return False + + # If server belongs to the other group already, + # take the group as a subgroup of this group + if affinity_map[sk].vid != group.vid: + if group.is_parent_affinity(sk): + self._set_implicit_grouping(sk, group, affinity_map, _groups) + + return True + + def _merge_server(self, _group, _sk, _servers, _affinity_map): + """Merge a server into the group.""" + + _group.subgroups[_sk] = _servers[_sk] + _servers[_sk].surgroup = _group + _affinity_map[_sk] = _group + + self._add_implicit_diversity_groups(_group, _servers[_sk].diversity_groups) + self._add_implicit_quorum_diversity_groups(_group, _servers[_sk].quorum_diversity_groups) + self._add_implicit_exclusivity_groups(_group, _servers[_sk].exclusivity_groups) + self._add_memberships(_group, _servers[_sk]) + + del _servers[_sk] + + def _add_implicit_diversity_groups(self, _group, _diversity_groups): + """Add subgroup's diversity groups.""" + + for dk, div_group in _diversity_groups.iteritems(): + if LEVEL.index(div_group.level) >= LEVEL.index(_group.level): + _group.diversity_groups[dk] = div_group + + def _add_implicit_quorum_diversity_groups(self, _group, _quorum_diversity_groups): + """Add subgroup's quorum diversity groups.""" + + for dk, div_group in _quorum_diversity_groups.iteritems(): + if LEVEL.index(div_group.level) >= LEVEL.index(_group.level): + _group.quorum_diversity_groups[dk] = div_group + + def _add_implicit_exclusivity_groups(self, _group, _exclusivity_groups): + """Add subgroup's exclusivity groups.""" + + for ek, ex_group in _exclusivity_groups.iteritems(): + if LEVEL.index(ex_group.level) >= LEVEL.index(_group.level): + _group.exclusivity_groups[ek] = ex_group + + def _add_memberships(self, _group, _v): + """Add subgroups's host-aggregates and AZs.""" + + for extra_specs in _v.extra_specs_list: + _group.extra_specs_list.append(extra_specs) + + if isinstance(_v, Server): + if _v.availability_zone is not None: + if _v.availability_zone not in _group.availability_zone_list: + _group.availability_zone_list.append(_v.availability_zone) + + if isinstance(_v, Group): + for az in _v.availability_zone_list: + if az not in _group.availability_zone_list: + _group.availability_zone_list.append(az) + + def _set_implicit_grouping(self, _vk, _g, _affinity_map, _groups): + """Take server's most top parent as a child group of this group _g.""" + + tg = _affinity_map[_vk] # Where _vk currently belongs to + + if tg.vid in _affinity_map.keys(): # If the parent belongs to the other parent group + self._set_implicit_grouping(tg.vid, _g, _affinity_map, _groups) + else: + if LEVEL.index(tg.level) > LEVEL.index(_g.level): + tg.level = _g.level + + if _g.is_parent_affinty(tg.vid): + _g.subgroups[tg.vid] = tg + tg.surgroup = _g + _affinity_map[tg.vid] = _g + + self._add_implicit_diversity_groups(_g, tg.diversity_groups) + self._add_implicit_quorum_diversity_groups(_g, tg.quorum_diversity_groups) + self._add_implicit_exclusivity_groups(_g, tg.exclusivity_groups) + self._add_memberships(_g, tg) + + del _groups[tg.vid] diff --git a/engine/src/valet/engine/app_manager/group.py b/engine/src/valet/engine/app_manager/group.py new file mode 100644 index 0000000..69c9339 --- /dev/null +++ b/engine/src/valet/engine/app_manager/group.py @@ -0,0 +1,139 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import six + + +LEVEL = ["host", "rack", "cluster"] + +FLAVOR_TYPES = ["gv", "nv", "nd", "ns", "ss"] + + +class Group(object): + """Container to keep requested valet groups. + + TODO(Gueyoung): create parent class to make common functions + """ + + def __init__(self, _id): + """Define group info and parameters.""" + + # ID consists of + # datacenter_id + [service_instance_id] + [vnf_instance_id] + rule_name + self.vid = _id + + # Type is either affinity, diversity, quorum_diversity, or exclusivity + self.group_type = None + + self.factory = None + + # Level is host or rack + self.level = None + + # To build containment tree + self.surgroup = None # parent 'affinity' object + self.subgroups = {} # child affinity group (or server) objects + + # Group objects of this container + self.diversity_groups = {} + self.quorum_diversity_groups = {} + self.exclusivity_groups = {} + + self.availability_zone_list = [] + + self.extra_specs_list = [] + + self.vCPUs = 0 + self.mem = 0 # MB + self.local_volume_size = 0 # GB + + self.vCPU_weight = -1 + self.mem_weight = -1 + self.local_volume_weight = -1 + + # To record which servers (e.g., VMs) in given request are assigned + # to this group. + self.server_list = [] + + self.sort_base = -1 + + def get_exclusivities(self, _level): + """Return exclusivity group requested with a level (host or rack). + + Note: each affinity group must have a single exclusivity group of the level. + """ + + exclusivities = {} + + for exk, group in self.exclusivity_groups.iteritems(): + if group.level == _level: + exclusivities[exk] = group + + return exclusivities + + def need_numa_alignment(self): + """Check if this server requires NUMA alignment.""" + + if len(self.extra_specs_list) > 0: + for es in self.extra_specs_list: + for key, req in six.iteritems(es): + if key == "hw:numa_nodes" and req == 1: + return True + + return False + + def is_parent_affinity(self, _vk): + """Check recursively if _vk is located in the group.""" + + exist = False + + for sgk, sg in self.subgroups.iteritems(): + if sgk == _vk: + exist = True + break + + if isinstance(sg, Group): + if sg.is_parent_affinity(_vk): + exist = True + break + + return exist + + def get_servers(self, _servers): + """Get all child servers.""" + + for _, sg in self.subgroups.iteritems(): + if isinstance(sg, Group): + sg.get_servers(_servers) + else: + if sg not in _servers: + _servers.append(sg) + + def get_flavor_types(self): + flavor_type_list = [] + + for extra_specs in self.extra_specs_list: + for k, v in extra_specs.iteritems(): + k_elements = k.split(':') + if len(k_elements) > 1: + if k_elements[0] == "aggregate_instance_extra_specs": + if k_elements[1].lower() in FLAVOR_TYPES: + if v == "true": + flavor_type_list.append(k_elements[1]) + + return flavor_type_list diff --git a/engine/src/valet/engine/app_manager/server.py b/engine/src/valet/engine/app_manager/server.py new file mode 100644 index 0000000..9052285 --- /dev/null +++ b/engine/src/valet/engine/app_manager/server.py @@ -0,0 +1,171 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import six + +from valet.engine.app_manager.group import FLAVOR_TYPES + + +class Server(object): + """Container to keep requested server (e.g., VM).""" + + def __init__(self, _id, _orch_id): + """Define server info and parameters.""" + + # ID consists of stack_name + ":" + server_name, + # where stack_name = datacenter_id + ":" + tenant_id + ":" + vf_module_name + self.vid = _id + + # ID in stack + self.orch_id = _orch_id + + # one given in Heat stack + self.name = None + + # Affinity group object + self.surgroup = None + + self.diversity_groups = {} + self.quorum_diversity_groups = {} + self.exclusivity_groups = {} + + self.availability_zone = None + + self.flavor = None + self.image = None + + self.vCPUs = 0 + self.mem = 0 # MB + self.local_volume_size = 0 # GB + self.extra_specs_list = [] + + self.vCPU_weight = -1 + self.mem_weight = -1 + self.local_volume_weight = -1 + + # To return placement. + # If stack is nested, should point the index to distinguish + self.host_assignment_variable = None + self.host_assignment_inx = -1 + + # Placement result + self.host_group = None # e.g., rack + self.host = None + self.numa = None + + # Request state is 'create', 'migrate', 'rebuild', 'delete' + # 'created', 'migrated', 'rebuilt' + self.state = "plan" + + # To inform if the current placement violates rules and requirements + self.status = "valid" + + self.sort_base = -1 + + def get_exclusivities(self, _level): + """Return exclusivity group requested with a level (host or rack). + + Note: each server must have a single exclusivity group of the level. + """ + + exclusivities = {} + + for exk, group in self.exclusivity_groups.iteritems(): + if group.level == _level: + exclusivities[exk] = group + + return exclusivities + + def need_numa_alignment(self): + """Check if this server requires NUMA alignment.""" + + if len(self.extra_specs_list) > 0: + for es in self.extra_specs_list: + for key, req in six.iteritems(es): + if key == "hw:numa_nodes" and int(req) == 1: + return True + + return False + + def get_flavor_types(self): + flavor_type_list = [] + + for extra_specs in self.extra_specs_list: + for k, v in extra_specs.iteritems(): + k_elements = k.split(':') + if len(k_elements) > 1: + if k_elements[0] == "aggregate_instance_extra_specs": + if k_elements[1].lower() in FLAVOR_TYPES: + if v == "true": + flavor_type_list.append(k_elements[1]) + + return flavor_type_list + + def get_json_info(self): + """Format server info as JSON.""" + + if self.surgroup is None: + surgroup_id = "none" + else: + surgroup_id = self.surgroup.vid + + diversity_groups = [] + for divk in self.diversity_groups.keys(): + diversity_groups.append(divk) + + quorum_diversity_groups = [] + for divk in self.quorum_diversity_groups.keys(): + quorum_diversity_groups.append(divk) + + exclusivity_groups = [] + for exk in self.exclusivity_groups.keys(): + exclusivity_groups.append(exk) + + if self.availability_zone is None: + availability_zone = "none" + else: + availability_zone = self.availability_zone + + if self.host_group is None: + host_group = "none" + else: + host_group = self.host_group + + if self.numa is None: + numa = "none" + else: + numa = self.numa + + return {'name': self.name, + 'orch_id': self.orch_id, + 'surgroup': surgroup_id, + 'diversity_groups': diversity_groups, + 'quorum_diversity_groups': quorum_diversity_groups, + 'exclusivity_groups': exclusivity_groups, + 'availability_zones': availability_zone, + 'extra_specs_list': self.extra_specs_list, + 'flavor': self.flavor, + 'image': self.image, + 'cpus': self.vCPUs, + 'mem': self.mem, + 'local_volume': self.local_volume_size, + 'host_group': host_group, + 'host': self.host, + 'numa': numa, + 'state': self.state, + 'status': self.status} diff --git a/engine/src/valet/engine/resource_manager/__init__.py b/engine/src/valet/engine/resource_manager/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/resource_manager/compute_manager.py b/engine/src/valet/engine/resource_manager/compute_manager.py new file mode 100644 index 0000000..81a95ee --- /dev/null +++ b/engine/src/valet/engine/resource_manager/compute_manager.py @@ -0,0 +1,201 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +from valet.engine.resource_manager.resources.host import Host + + +class ComputeManager(object): + """Resource Manager to maintain compute host resources.""" + + def __init__(self, _source, _logger): + """Define compute hosts and server allocations.""" + + self.source = _source + + self.hosts = {} + + self.logger = _logger + + def get_hosts(self, _resource): + """Check any inconsistency and perform garbage collection if necessary.""" + + self.logger.info("set compute hosts...") + + # Init first + self.hosts.clear() + + # Get available hosts only + if self.source.get_hosts(self.hosts) != "ok": + self.logger.warning("fail to set hosts from source (e.g., nova)") + return False + + # Get servers + if self.source.get_servers_in_hosts(self.hosts) != "ok": + self.logger.warning("fail to set servers from source (e.g., nova)") + return False + + self._check_host_updated(_resource) + + self._check_server_placements(_resource) + + return True + + def _check_host_updated(self, _resource): + """Check if hosts and their properties are changed.""" + + for hk in self.hosts.keys(): + if hk not in _resource.hosts.keys(): + _resource.hosts[hk] = Host(hk) + _resource.mark_host_updated(hk) + + self.logger.info("new host (" + hk + ") added") + + for rhk in _resource.hosts.keys(): + if rhk not in self.hosts.keys(): + _resource.hosts[rhk].status = "disabled" + _resource.mark_host_updated(rhk) + + self.logger.info("host (" + rhk + ") disabled") + + for hk in self.hosts.keys(): + host = self.hosts[hk] + rhost = _resource.hosts[hk] + + if self._is_host_resources_updated(host, rhost): + _resource.mark_host_updated(hk) + + def _is_host_resources_updated(self, _host, _rhost): + """Check the resource amount consistency.""" + + resource_updated = False + + if _host.original_vCPUs != _rhost.original_vCPUs: + _rhost.original_vCPUs = _host.original_vCPUs + + self.logger.info("host (" + _rhost.name + ") updated (origin CPU updated)") + resource_updated = True + + if _host.vCPUs_used != _rhost.vCPUs_used: + _rhost.vCPUs_used = _host.vCPUs_used + + self.logger.info("host (" + _rhost.name + ") updated (CPU updated)") + resource_updated = True + + if _host.original_mem_cap != _rhost.original_mem_cap: + _rhost.original_mem_cap = _host.original_mem_cap + + self.logger.info("host (" + _rhost.name + ") updated (origin mem updated)") + resource_updated = True + + if _host.free_mem_mb != _rhost.free_mem_mb: + _rhost.free_mem_mb = _host.free_mem_mb + + self.logger.info("host (" + _rhost.name + ") updated (mem updated)") + resource_updated = True + + if _host.original_local_disk_cap != _rhost.original_local_disk_cap: + _rhost.original_local_disk_cap = _host.original_local_disk_cap + + self.logger.info("host (" + _rhost.name + ") updated (origin disk updated)") + resource_updated = True + + if _host.free_disk_gb != _rhost.free_disk_gb: + _rhost.free_disk_gb = _host.free_disk_gb + + self.logger.info("host (" + _rhost.name + ") updated (local disk space updated)") + resource_updated = True + + if _host.disk_available_least != _rhost.disk_available_least: + _rhost.disk_available_least = _host.disk_available_least + + self.logger.info("host (" + _rhost.name + ") updated (least disk space updated)") + resource_updated = True + + return resource_updated + + def _check_server_placements(self, _resource): + """Check the consistency of server placements with nova.""" + + # To keep how server placements changed. + # key = + # If uuid is available, uuid + # Else stack_id:name + # value = {new_host, old_host, server_info} + # The server's state must be either 'created', 'migrated', or 'rebuilt'. + # That is, deal with only the server which placement decision is confirmed. + # If value of new_host (from nova) exists but not for old_host (valet), + # the server is unknown one to valet. + # If value of new_host not exists but exist for old_host, + # the server is deleted by nova. + # If value exists both in new_host and old_host, + # the server is moved from old to new host. + # If value not exist neither, + # the server is placed as planned. + change_of_placements = {} + + for hk, host in self.hosts.iteritems(): + rhost = _resource.hosts[hk] + + for s_info in host.server_list: + if s_info["stack_id"] != "none": + sid = s_info["stack_id"] + ":" + s_info["name"] + else: + sid = s_info["uuid"] + + change_of_placements[sid] = {} + change_of_placements[sid]["info"] = s_info + + if not rhost.has_server(s_info): + change_of_placements[sid]["new_host"] = hk + + self.logger.info("host (" + hk + ") updated (server added)") + else: + change_of_placements[sid]["host"] = hk + + for rhk, rhost in _resource.hosts.iteritems(): + if not rhost.is_available(): + continue + + host = self.hosts[rhk] + + for s_info in rhost.server_list: + # Deal with confirmed placements only. + if s_info["state"] not in ("created", "migrated", "rebuilt"): + continue + + if s_info["stack_id"] != "none": + sid = s_info["stack_id"] + ":" + s_info["name"] + else: + sid = s_info["uuid"] + + if not host.has_server(s_info): + if sid in change_of_placements.keys(): + change_of_placements[sid]["old_host"] = rhk + + self.logger.info("server (" + sid + ") is migrated`") + else: + change_of_placements[sid] = {} + change_of_placements[sid]["old_host"] = rhk + change_of_placements[sid]["info"] = s_info + + self.logger.info("server (" + sid + ") is deleted") + + _resource.change_of_placements = change_of_placements diff --git a/engine/src/valet/engine/resource_manager/metadata_manager.py b/engine/src/valet/engine/resource_manager/metadata_manager.py new file mode 100644 index 0000000..f34e3f0 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/metadata_manager.py @@ -0,0 +1,424 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +import json + +from copy import deepcopy + + +class MetadataManager(object): + """Metadata Manager to maintain flavors and groups.""" + + def __init__(self, _source, _logger): + self.source = _source + + self.groups = {} + + self.flavors = {} + + self.logger = _logger + + def get_groups(self, _resource): + """Set groups (availability-zones, host-aggregates, server groups) + + from platform (e.g., nova). + """ + + self.logger.info("set metadata (groups)...") + + # Init first + self.groups.clear() + + # Get enabled groups only + if self.source.get_groups(self.groups) != "ok": + self.logger.warning("fail to set groups from source (e.g., nova)") + return False + + self._check_group_updated(_resource) + + self._check_host_memberships_updated(_resource) + + return True + + def _check_group_updated(self, _resource): + """Check any inconsistency for groups.""" + + for gk in self.groups.keys(): + if gk not in _resource.groups.keys(): + _resource.groups[gk] = deepcopy(self.groups[gk]) + _resource.groups[gk].updated = True + + self.logger.info("new group (" + gk + ") added") + + for rgk in _resource.groups.keys(): + rg = _resource.groups[rgk] + + if rg.factory != "valet": + if rgk not in self.groups.keys(): + rg.status = "disabled" + rg.updated = True + + self.logger.info("group (" + rgk + ") disabled") + + for gk in self.groups.keys(): + g = self.groups[gk] + rg = _resource.groups[gk] + + if rg.uuid is None and g.uuid is not None: + rg.uuid = g.uuid + rg.updated = True + + self.logger.info("group (" + gk + ") uuid updated") + + # TODO: Clean up resource.hosts if each is not in any AZ members. + + if g.group_type == "aggr": + if not gk.startswith("valet:"): + if self._is_group_metadata_updated(g, rg): + rg.updated = True + + self.logger.info("group (" + gk + ") metadata updated") + + if g.group_type == "az" or g.group_type == "aggr": + if self._is_member_hosts_updated(g, _resource): + rg.updated = True + + self.logger.info("group (" + gk + ") member hosts updated") + + if g.factory == "server-group": + if self._is_new_servers(g, rg): + rg.updated = True + + self.logger.info("group (" + gk + ") server_list updated") + + def _is_group_metadata_updated(self, _g, _rg): + """Check any change in metadata of group.""" + + updated = False + + for mdk in _g.metadata.keys(): + if mdk not in _rg.metadata.keys(): + _rg.metadata[mdk] = _g.metadata[mdk] + updated = True + + for rmdk in _rg.metadata.keys(): + if rmdk not in _g.metadata.keys(): + del _rg.metadata[rmdk] + updated = True + + for mdk in _g.metadata.keys(): + mdv = _g.metadata[mdk] + rmdv = _rg.metadata[mdk] + if mdv != rmdv: + _rg.metadata[mdk] = mdv + updated = True + + return updated + + def _is_member_hosts_updated(self, _g, _resource): + """Check any change in member hosts of group.""" + + updated = False + + _rg = _resource.groups[_g.name] + + for hk in _g.member_hosts.keys(): + if hk not in _rg.member_hosts.keys(): + if hk in _resource.hosts.keys(): + if _resource.hosts[hk].is_available(): + _rg.member_hosts[hk] = deepcopy(_g.member_hosts[hk]) + updated = True + # else not needed + + for rhk in _rg.member_hosts.keys(): + if rhk not in _resource.hosts.keys() or \ + not _resource.hosts[rhk].is_available() or \ + rhk not in _g.member_hosts.keys(): + del _rg.member_hosts[rhk] + updated = True + + return updated + + def _is_new_servers(self, _g, _rg): + """Check if there is any new server.""" + + updated = False + + for s_info in _g.server_list: + exist = False + for rs_info in _rg.server_list: + if rs_info.get("uuid") == s_info.get("uuid"): + exist = True + break + + if not exist: + _rg.server_list.append(s_info) + updated = True + + return updated + + def _check_host_memberships_updated(self, _resource): + """Check host memberships consistency.""" + + for gk, g in _resource.groups.iteritems(): + # Other group types will be handled later + if g.factory != "valet" and g.status == "enabled": + for hk in g.member_hosts.keys(): + host = _resource.hosts[hk] + if gk not in host.memberships.keys() or g.updated: + host.memberships[gk] = g + _resource.mark_host_updated(hk) + + self.logger.info("host (" + hk + ") updated (update membership - " + gk + ")") + + for hk, host in _resource.hosts.iteritems(): + if host.is_available(): + for gk in host.memberships.keys(): + if gk in _resource.groups.keys(): + g = _resource.groups[gk] + if g.factory != "valet": + if g.status == "enabled": + if g.updated: + host.memberships[gk] = g + _resource.mark_host_updated(hk) + + self.logger.info("host (" + hk + ") updated (update membership - " + gk + ")") + else: + del host.memberships[gk] + _resource.mark_host_updated(hk) + + self.logger.info("host (" + hk + ") updated (remove membership - " + gk + ")") + else: + del host.memberships[gk] + _resource.mark_host_updated(hk) + + self.logger.info("host (" + hk + ") updated (remove membership - " + gk + ")") + + def create_exclusive_aggregate(self, _group, _hosts): + """Set Host-Aggregate to apply Exclusivity.""" + + az = _hosts[0].get_availability_zone() + + # To remove 'az:' header from name + az_name_elements = az.name.split(':', 1) + if len(az_name_elements) > 1: + az_name = az_name_elements[1] + else: + az_name = az.name + + status = self.source.set_aggregate(_group.name, az_name) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + _group.name + ") created") + + aggregates = {} + status = self.source.get_aggregates(aggregates) + if status != "ok": + return status + + if _group.name in aggregates.keys(): + _group.uuid = aggregates[_group.name].uuid + + if len(_group.metadata) > 0: + metadata = {} + for mk, mv in _group.metadata.iteritems(): + if mk == "prior_metadata": + metadata[mk] = json.dumps(mv) + else: + metadata[mk] = mv + + status = self.source.set_metadata_of_aggregate(_group.uuid, metadata) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + _group.name + ") metadata created") + + for host in _hosts: + if host.name in _group.metadata.keys(): + aggr_uuids = _group.metadata[host.name].split(',') + + for uuid in aggr_uuids: + status = self.source.remove_host_from_aggregate(int(uuid), host.name) + if status != "ok": + return status + + self.logger.debug("host-aggregate(" + uuid + ") host(" + host.name + ") removed") + + status = self.source.add_host_to_aggregate(_group.uuid, host.name) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + _group.name + ") host(" + host.name + ") added") + else: + status = "dynamic host-aggregate not found" + self.logger.error(status) + return status + + return "ok" + + def update_exclusive_aggregate(self, _id, _metadata, _host, _old_aggregates): + """Update Host-Aggregate to apply Exclusivity.""" + + if len(_metadata) > 0: + metadata = {} + for mk, mv in _metadata.iteritems(): + if mk == "prior_metadata": + metadata[mk] = json.dumps(mv) + else: + metadata[mk] = mv + + status = self.source.set_metadata_of_aggregate(_id, metadata) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") metadata updated") + + for oa in _old_aggregates: + status = self.source.remove_host_from_aggregate(oa.uuid, _host) + if status != "ok": + return status + + self.logger.debug("host-aggregate(" + oa.name + ") host(" + _host + ") removed") + + status = self.source.add_host_to_aggregate(_id, _host) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") host(" + _host + ") added") + + return "ok" + + def remove_host_from_exclusive_aggregate(self, _id, _metadata, _host, _old_aggregates): + """Remove host from Host-Aggregate to apply Exclusivity.""" + + if len(_metadata) > 0: + metadata = {} + for mk, mv in _metadata.iteritems(): + if mk == "prior_metadata": + metadata[mk] = json.dumps(mv) + else: + metadata[mk] = mv + + status = self.source.set_metadata_of_aggregate(_id, metadata) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") metadata updated") + + status = self.source.remove_host_from_aggregate(_id, _host) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") host(" + _host + ") removed") + + for oa in _old_aggregates: + status = self.source.add_host_to_aggregate(oa.uuid, _host) + if status != "ok": + return status + + self.logger.debug("host-aggregate(" + oa.name + ") host(" + _host + ") added") + + return "ok" + + def remove_exclusive_aggregate(self, _id): + """Remove Host-Aggregate.""" + + status = self.source.delete_aggregate(_id) + if status != "ok": + return status + + self.logger.debug("dynamic host-aggregate(" + str(_id) + ") removed") + + return "ok" + + def get_flavors(self, _resource): + """Set flavors from nova.""" + + self.logger.info("set metadata (flavors)...") + + # Init first + self.flavors.clear() + + # Get enabled flavors only + if self.source.get_flavors(self.flavors, detailed=False) != "ok": + return False + + self._check_flavor_update(_resource, False) + + return True + + def _check_flavor_update(self, _resource, _detailed): + """Check flavor info consistency.""" + + for fk in self.flavors.keys(): + if fk not in _resource.flavors.keys(): + _resource.flavors[fk] = deepcopy(self.flavors[fk]) + _resource.flavors[fk].updated = True + + self.logger.info("new flavor (" + fk + ":" + self.flavors[fk].flavor_id + ") added") + + for rfk in _resource.flavors.keys(): + rf = _resource.flavors[rfk] + if rfk not in self.flavors.keys(): + rf.status = "disabled" + rf.updated = True + + self.logger.info("flavor (" + rfk + ":" + rf.flavor_id + ") removed") + + if _detailed: + for fk in self.flavors.keys(): + f = self.flavors[fk] + rf = _resource.flavors[fk] + if self._is_flavor_spec_updated(f, rf): + rf.updated = True + + self.logger.info("flavor (" + fk + ":" + rf.flavor_id + ") spec updated") + + def _is_flavor_spec_updated(self, _f, _rf): + """Check flavor's spec consistency.""" + + spec_updated = False + + if _f.vCPUs != _rf.vCPUs or _f.mem_cap != _rf.mem_cap or _f.disk_cap != _rf.disk_cap: + _rf.vCPUs = _f.vCPUs + _rf.mem_cap = _f.mem_cap + _rf.disk_cap = _f.disk_cap + spec_updated = True + + for sk in _f.extra_specs.keys(): + if sk not in _rf.extra_specs.keys(): + _rf.extra_specs[sk] = _f.extra_specs[sk] + spec_updated = True + + for rsk in _rf.extra_specs.keys(): + if rsk not in _f.extra_specs.keys(): + del _rf.extra_specs[rsk] + spec_updated = True + + for sk in _f.extra_specs.keys(): + sv = _f.extra_specs[sk] + rsv = _rf.extra_specs[sk] + if sv != rsv: + _rf.extra_specs[sk] = sv + spec_updated = True + + return spec_updated diff --git a/engine/src/valet/engine/resource_manager/naming.py b/engine/src/valet/engine/resource_manager/naming.py new file mode 100644 index 0000000..bdf5211 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/naming.py @@ -0,0 +1,146 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +import copy +import re + +from sre_parse import isdigit +from valet.engine.resource_manager.resources.host_group import HostGroup + + +class Naming(object): + """Using cannonical naming convention to capture datacenter layout.""" + + def __init__(self, _config, _logger): + self.logger = _logger + + self.rack_code_list = _config.get("rack_codes") + self.host_code_list = _config.get("host_codes") + + def get_topology(self, _datacenter, _host_groups, _hosts, _rhosts): + """Set datacenter resource structure (racks, hosts).""" + + status = "ok" + + for rhk, rhost in _rhosts.iteritems(): + h = copy.deepcopy(rhost) + + (rack_name, parsing_status) = self._set_layout_by_name(rhk) + if parsing_status != "ok": + self.logger.warning(parsing_status + " in host_name (" + rhk + ")") + + if rack_name == "none": + h.host_group = _datacenter + _datacenter.resources[rhk] = h + else: + if rack_name not in _host_groups.keys(): + host_group = HostGroup(rack_name) + host_group.host_type = "rack" + _host_groups[host_group.name] = host_group + else: + host_group = _host_groups[rack_name] + + h.host_group = host_group + host_group.child_resources[rhk] = h + + _hosts[h.name] = h + + for hgk, hg in _host_groups.iteritems(): + hg.parent_resource = _datacenter + _datacenter.resources[hgk] = hg + + if "none" in _host_groups.keys(): + self.logger.warning("some hosts are into unknown rack") + + return status + + def _set_layout_by_name(self, _host_name): + """Set the rack-host layout, use host nameing convention. + + Naming convention includes + zone name is any word followed by at least one of [0-9] + rack name is rack_code followd by at least one of [0-9] + host name is host_code followed by at least one of [0-9] + an example is + 'abcd_001A' (as a zone_name) + + 'r' (as a rack_code) + '01A' + + 'c' (as a host_code) + '001A' + """ + + zone_name = None + rack_name = None + host_name = None + + # To check if zone name follows the rule + index = 0 + for c in _host_name: + if isdigit(c): + break + index += 1 + zone_indicator = _host_name[index:] + if len(zone_indicator) == 0: + return 'none', "no numberical digit in name" + + # To extract rack indicator + for rack_code in self.rack_code_list: + rack_index_list = [rc.start() for rc in re.finditer(rack_code, zone_indicator)] + + start_of_rack_index = -1 + for rack_index in rack_index_list: + rack_prefix = rack_index + len(rack_code) + if rack_prefix > len(zone_indicator): + continue + + # Once rack name follows the rule + if isdigit(zone_indicator[rack_prefix]): + rack_indicator = zone_indicator[rack_prefix:] + + # To extract host indicator + for host_code in self.host_code_list: + host_index_list = [hc.start() for hc in re.finditer(host_code, rack_indicator)] + + start_of_host_index = -1 + for host_index in host_index_list: + host_prefix = host_index + len(host_code) + if host_prefix > len(rack_indicator): + continue + + if isdigit(rack_indicator[host_prefix]): + host_name = rack_indicator[host_index:] + start_of_host_index = rack_index + host_index + 1 + break + + if host_name is not None: + rack_name = zone_indicator[rack_index:start_of_host_index] + break + + if rack_name is not None: + start_of_rack_index = index + rack_index + break + + if rack_name is not None: + zone_name = _host_name[:start_of_rack_index] + break + + if rack_name is None: + return 'none', "no host or rack name found in " + _host_name + else: + return zone_name + rack_name, "ok" diff --git a/engine/src/valet/engine/resource_manager/nova_compute.py b/engine/src/valet/engine/resource_manager/nova_compute.py new file mode 100644 index 0000000..6887eb8 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/nova_compute.py @@ -0,0 +1,544 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import json
+import time
+import traceback
+
+from novaclient import client as nova_client
+
+from valet.engine.resource_manager.resources.flavor import Flavor
+from valet.engine.resource_manager.resources.group import Group
+from valet.engine.resource_manager.resources.host import Host
+from valet.utils.decryption import decrypt
+
+
+# Nova API version
+VERSION = 2
+
+
+# noinspection PyBroadException
+class NovaCompute(object):
+ """Source to collect resource status (i.e., OpenStack Nova).
+
+ Manupulate Host-Aggregate with Valet placement decisions.
+ """
+
+ def __init__(self, _config, _logger):
+ self.logger = _logger
+
+ self.nova = None
+
+ self.novas = {}
+ self.last_activate_urls = {}
+ self.life_time = 43200 # 12 hours
+
+ # TODO(Gueyoung): handle both admin and admin_view accounts.
+
+ pw = decrypt(_config["engine"]["ek"],
+ _config["logging"]["lk"],
+ _config["db"]["dk"],
+ _config["nova"]["admin_view_password"])
+
+ self.admin_username = _config["nova"]["admin_view_username"]
+ self.admin_password = pw
+ self.project = _config["nova"]["project_name"]
+
+ def set_client(self, _auth_url):
+ """Set nova client."""
+
+ try:
+ # TODO: add timeout=_timeout?
+ self.novas[_auth_url] = nova_client.Client(VERSION,
+ self.admin_username,
+ self.admin_password,
+ self.project,
+ _auth_url)
+
+ self.last_activate_urls[_auth_url] = time.time()
+
+ self.nova = self.novas[_auth_url]
+ return True
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return False
+
+ def valid_client(self, _auth_url):
+ """Check if nova connection is valid."""
+
+ if _auth_url not in self.novas.keys():
+ return False
+
+ if _auth_url not in self.last_activate_urls.keys():
+ return False
+
+ elapsed_time = time.time() - self.last_activate_urls[_auth_url]
+
+ if elapsed_time > self.life_time:
+ return False
+
+ self.nova = self.novas[_auth_url]
+
+ return True
+
+ def get_groups(self, _groups):
+ """Get server-groups, availability-zones and host-aggregates
+
+ from OpenStack Nova.
+ """
+
+ status = self._get_availability_zones(_groups)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ status = self.get_aggregates(_groups)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ status = self._get_server_groups(_groups)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ return "ok"
+
+ def _get_availability_zones(self, _groups):
+ """Set AZs."""
+
+ try:
+ # TODO: try hosts_list = self.nova.hosts.list()?
+
+ az_list = self.nova.availability_zones.list(detailed=True)
+
+ for a in az_list:
+ if a.zoneState["available"]:
+ # NOTE(Gueyoung): add 'az:' to avoid conflict with
+ # Host-Aggregate name.
+ az_id = "az:" + a.zoneName
+
+ az = Group(az_id)
+
+ az.group_type = "az"
+ az.factory = "nova"
+ az.level = "host"
+
+ # TODO: Get AZ first with init Compute Hosts?
+
+ for hk, h_info in a.hosts.iteritems():
+ if "nova-compute" in h_info.keys():
+ if h_info["nova-compute"]["active"] and \
+ h_info["nova-compute"]["available"]:
+ az.member_hosts[hk] = []
+
+ _groups[az_id] = az
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting availability-zones from Nova"
+
+ return "ok"
+
+ def get_aggregates(self, _groups):
+ """Set host-aggregates and corresponding hosts."""
+
+ try:
+ aggregate_list = self.nova.aggregates.list()
+
+ for a in aggregate_list:
+ if not a.deleted:
+ aggregate = Group(a.name)
+
+ aggregate.uuid = a.id
+
+ aggregate.group_type = "aggr"
+ aggregate.factory = "nova"
+ aggregate.level = "host"
+
+ metadata = {}
+ for mk in a.metadata.keys():
+ if mk == "prior_metadata":
+ metadata[mk] = json.loads(a.metadata.get(mk))
+ else:
+ metadata[mk] = a.metadata.get(mk)
+ aggregate.metadata = metadata
+
+ for hn in a.hosts:
+ aggregate.member_hosts[hn] = []
+
+ _groups[aggregate.name] = aggregate
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting host-aggregates from Nova"
+
+ return "ok"
+
+ def set_aggregate(self, _name, _az):
+ """Create a Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.create(_name, _az)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting a host-aggregate in Nova"
+
+ return "ok"
+
+ def add_host_to_aggregate(self, _aggr, _host):
+ """Add a Host into the Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.add_host(_aggr, _host)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while adding a host into host-aggregate in Nova"
+
+ return "ok"
+
+ def delete_aggregate(self, _aggr):
+ """Delete the Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.delete(_aggr)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while deleting host-aggregate from Nova"
+
+ return "ok"
+
+ def remove_host_from_aggregate(self, _aggr, _host):
+ """Remove the Host from the Host-Aggregate."""
+
+ try:
+ self.nova.aggregates.remove_host(_aggr, _host)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while removing host from host-aggregate in Nova"
+
+ return "ok"
+
+ def set_metadata_of_aggregate(self, _aggr, _metadata):
+ """Set metadata.
+
+ Note that Nova adds key/value pairs into metadata instead of replacement.
+ """
+
+ try:
+ self.nova.aggregates.set_metadata(_aggr, _metadata)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting metadata of host-aggregate in Nova"
+
+ return "ok"
+
+ def _get_server_groups(self, _groups):
+ """Set host-aggregates and corresponding hosts."""
+
+ try:
+ # NOTE(Gueyoung): novaclient v2.18.0 does not have 'all_projects=True' param.
+ server_group_list = self.nova.server_groups.list()
+
+ for g in server_group_list:
+ server_group = Group(g.name)
+
+ server_group.uuid = g.id
+
+ # TODO: Check len(g.policies) == 1
+ # policy is either 'affinity', 'anti-affinity', 'soft-affinity',
+ # or 'soft-anti-affinity'
+ if g.policies[0] == "anti-affinity":
+ server_group.group_type = "diversity"
+ else:
+ server_group.group_type = g.policies[0]
+ server_group.factory = "server-group"
+ server_group.level = "host"
+
+ # Members attribute is a list of server uuids
+ for s_uuid in g.members:
+ s_info = {}
+ s_info["stack_id"] = "none"
+ s_info["stack_name"] = "none"
+ s_info["uuid"] = s_uuid
+ s_info["orch_id"] = "none"
+ s_info["name"] = "none"
+ s_info["flavor_id"] = "none"
+ s_info["vcpus"] = -1
+ s_info["mem"] = -1
+ s_info["disk"] = -1
+ s_info["numa"] = "none"
+ s_info["image_id"] = "none"
+ s_info["tenant_id"] = "none"
+ s_info["state"] = "created"
+ s_info["status"] = "valid"
+
+ server_group.server_list.append(s_info)
+
+ # TODO: Check duplicated name as group identifier
+ _groups[server_group.name] = server_group
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting server-groups from Nova"
+
+ return "ok"
+
+ def get_hosts(self, _hosts):
+ """Set host resources info."""
+
+ # TODO: Deprecated as of version 2.43
+ status = self._get_hosts(_hosts)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ status = self._get_host_details(_hosts)
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ return "ok"
+
+ # TODO: Deprecated as of version 2.43
+ def _get_hosts(self, _hosts):
+ """Init hosts."""
+
+ try:
+ host_list = self.nova.hosts.list()
+
+ for h in host_list:
+ if h.service == "compute":
+ host = Host(h.host_name)
+ _hosts[host.name] = host
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting hosts from Nova"
+
+ return "ok"
+
+ def _get_host_details(self, _hosts):
+ """Get each host's resource status."""
+
+ try:
+ # TODO: marker: the last UUID of return, limit: the number of hosts returned.
+ # with_servers=True
+ host_list = self.nova.hypervisors.list(detailed=True)
+
+ for hv in host_list:
+ if hv.service['host'] in _hosts.keys():
+ if hv.status != "enabled" or hv.state != "up":
+ del _hosts[hv.service['host']]
+ else:
+ host = _hosts[hv.service['host']]
+
+ host.uuid = hv.id
+
+ host.status = hv.status
+ host.state = hv.state
+ host.original_vCPUs = float(hv.vcpus)
+ host.vCPUs_used = float(hv.vcpus_used)
+ host.original_mem_cap = float(hv.memory_mb)
+ host.free_mem_mb = float(hv.free_ram_mb)
+ host.original_local_disk_cap = float(hv.local_gb)
+ host.free_disk_gb = float(hv.free_disk_gb)
+ host.disk_available_least = float(hv.disk_available_least)
+
+ # TODO: cpu_info:topology:sockets
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while setting host resources from Nova"
+
+ return "ok"
+
+ def get_servers_in_hosts(self, _hosts):
+ """Set servers in hosts."""
+
+ (status, server_list) = self.get_server_detail()
+ if status != "ok":
+ self.logger.error(status)
+ return status
+
+ for s in server_list:
+ s_info = {}
+
+ if "stack-id" in s.metadata.keys():
+ s_info["stack_id"] = s.metadata["stack-id"]
+ else:
+ s_info["stack_id"] = "none"
+ s_info["stack_name"] = "none"
+
+ s_info["uuid"] = s.id
+
+ s_info["orch_id"] = "none"
+ s_info["name"] = s.name
+
+ s_info["flavor_id"] = s.flavor["id"]
+
+ if "vcpus" in s.flavor.keys():
+ s_info["vcpus"] = s.flavor["vcpus"]
+ s_info["mem"] = s.flavor["ram"]
+ s_info["disk"] = s.flavor["disk"]
+ s_info["disk"] += s.flavor["ephemeral"]
+ s_info["disk"] += s.flavor["swap"] / float(1024)
+ else:
+ s_info["vcpus"] = -1
+ s_info["mem"] = -1
+ s_info["disk"] = -1
+
+ s_info["numa"] = "none"
+
+ try:
+ s_info["image_id"] = s.image["id"]
+ except TypeError:
+ self.logger.warning("In get_servers_in_hosts, expected s.image to have id tag, but it's actually " + s.image)
+ s_info["image_id"] = s.image
+
+ s_info["tenant_id"] = s.tenant_id
+
+ s_info["state"] = "created"
+ s_info["status"] = "valid"
+
+ s_info["host"] = s.__getattr__("OS-EXT-SRV-ATTR:host")
+
+ # s_info["power_state"] = s.__getattr__("OS-EXT-STS:power_state")
+ # s_info["vm_state"] = s.__getattr__("OS-EXT-STS:vm_state")
+ # s_info["task_state"] = s.__getattr__("OS-EXT-STS:task_state")
+
+ if s_info["host"] in _hosts.keys():
+ host = _hosts[s_info["host"]]
+ host.server_list.append(s_info)
+
+ return "ok"
+
+ def get_server_detail(self, project_id=None, host_name=None, server_name=None, uuid=None):
+ """Get the detail of server with search by option."""
+
+ # TODO: Get servers' info in each host
+ # Minimum requirement for server info: s["metadata"]["stack-id"],
+ # More: s["flavor"]["id"], s["tenant_id"]
+ # Maybe: s["image"], server.__getattr__("OS-EXT-AZ:availability_zone"), s["status"]
+ # and scheduler_hints?
+ try:
+ options = {"all_tenants": 1}
+ if project_id is not None:
+ options["project_id"] = project_id
+ if host_name is not None:
+ options["host"] = host_name
+ if server_name is not None:
+ options["name"] = server_name
+ if uuid is not None:
+ options["uuid"] = uuid
+
+ # TODO: search by vm_state?
+
+ if len(options) > 0:
+ server_list = self.nova.servers.list(detailed=True, search_opts=options)
+ else:
+ server_list = self.nova.servers.list(detailed=True)
+
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while getting server detail from nova", None
+
+ return "ok", server_list
+
+ def get_flavors(self, _flavors, detailed=True):
+ """Get flavors."""
+
+ if detailed:
+ result_status = self._get_flavors(_flavors, True)
+ else:
+ result_status = self._get_flavors(_flavors, False)
+
+ if result_status != "ok":
+ self.logger.error(result_status)
+
+ return result_status
+
+ def _get_flavors(self, _flavors, _detailed):
+ """Get a list of all flavors."""
+
+ try:
+ flavor_list = self.nova.flavors.list(detailed=_detailed)
+
+ for f in flavor_list:
+ flavor = self._set_flavor(f, _detailed)
+ _flavors[flavor.name] = flavor
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while getting flavors"
+
+ # To get non-public flavors.
+ try:
+ flavor_list = self.nova.flavors.list(detailed=_detailed, is_public=False)
+
+ for f in flavor_list:
+ if f.name not in _flavors.keys():
+ flavor = self._set_flavor(f, _detailed)
+ _flavors[flavor.name] = flavor
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return "error while getting flavors"
+
+ return "ok"
+
+ def get_flavor(self, _flavor_id):
+ """Get the flavor."""
+
+ try:
+ f = self.nova.flavors.get(_flavor_id)
+ flavor = self._set_flavor(f, True)
+ except Exception:
+ self.logger.error(traceback.format_exc())
+ return None
+
+ return flavor
+
+ def _set_flavor(self, _f, _detailed):
+ """Set flavor with detailed infomation."""
+
+ flavor = Flavor(_f.name)
+
+ flavor.flavor_id = _f.id
+
+ if _detailed:
+ # NOTE(Gueyoung): This is not allowed with current credential.
+ # if getattr(_f, "OS-FLV-DISABLED:disabled"):
+ # flavor.status = "disabled"
+
+ flavor.vCPUs = float(_f.vcpus)
+ flavor.mem_cap = float(_f.ram)
+
+ root_gb = float(_f.disk)
+ ephemeral_gb = 0.0
+ if hasattr(_f, "OS-FLV-EXT-DATA:ephemeral"):
+ ephemeral_gb = float(getattr(_f, "OS-FLV-EXT-DATA:ephemeral"))
+ swap_mb = 0.0
+ if hasattr(_f, "swap"):
+ sw = getattr(_f, "swap")
+ if sw != '':
+ swap_mb = float(sw)
+ flavor.disk_cap = root_gb + ephemeral_gb + swap_mb / float(1024)
+
+ extra_specs = _f.get_keys()
+ for sk, sv in extra_specs.iteritems():
+ flavor.extra_specs[sk] = sv
+
+ return flavor
diff --git a/engine/src/valet/engine/resource_manager/resource.py b/engine/src/valet/engine/resource_manager/resource.py new file mode 100644 index 0000000..0f2b550 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resource.py @@ -0,0 +1,1589 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import json +import six +import time + +from valet.engine.app_manager.group import LEVEL +from valet.engine.resource_manager.resources.datacenter import Datacenter +from valet.engine.resource_manager.resources.flavor import Flavor +from valet.engine.resource_manager.resources.group import Group +from valet.engine.resource_manager.resources.host import Host +from valet.engine.resource_manager.resources.host_group import HostGroup +from valet.engine.resource_manager.resources.numa import NUMA + + +class Resource(object): + """Container for resource status of a datacenter and all metadata.""" + + def __init__(self, _datacenter, _dbh, _compute, _metadata, _topology, _logger): + self.dbh = _dbh + + self.compute = _compute + self.metadata = _metadata + self.topology = _topology + + self.group_rules = {} + + self.datacenter = None + self.datacenter_id = _datacenter.get("id") + self.datacenter_url = _datacenter.get("url", "none") + + self.host_groups = {} + self.hosts = {} + + self.change_of_placements = {} + + self.groups = {} + self.flavors = {} + + self.CPU_avail = 0 + self.mem_avail = 0 + self.local_disk_avail = 0 + + self.default_cpu_allocation_ratio = 1.0 + self.default_ram_allocation_ratio = 1.0 + self.default_disk_allocation_ratio = 1.0 + + self.new = False + + # To keep unconfirmed requests. + # If exist, do NOT sync with platform for the next request. + self.pending_requests = [] + + self.logger = _logger + + def set_config(self, _cpu_ratio, _ram_ratio, _disk_ratio): + self.default_cpu_allocation_ratio = _cpu_ratio + self.default_ram_allocation_ratio = _ram_ratio + self.default_disk_allocation_ratio = _disk_ratio + + def set_group_rules(self, _rules): + self.group_rules = _rules + + def load_resource_from_db(self): + """Load datacenter's resource info from DB. + + Note: all resources in DB are enabled ones. + """ + + self.logger.info("load datacenter resource info from DB") + + # Load Valet groups first. + valet_group_list = self.dbh.get_valet_groups() + if valet_group_list is None: + return None + + valet_groups = {} + for vg in valet_group_list: + vgk = vg.get("id") + dc_id = vgk.split(':', 1) + + if dc_id[0] == self.datacenter_id: + if vg["rule_id"] in self.group_rules.keys(): + vg["metadata"] = json.loads(vg["metadata"]) + vg["server_list"] = json.loads(vg["server_list"]) + vg["member_hosts"] = json.loads(vg["member_hosts"]) + vg["group_type"] = vg["type"] + + valet_groups[vgk] = vg + + self._load_groups(valet_groups) + + dcr = self.dbh.get_resource(self.datacenter_id) + if dcr is None: + return None + + if len(dcr) == 0: + return "no resource found for datacenter = " + self.datacenter_id + + if self.datacenter_url == "none": + self.datacenter_url = dcr["url"] + + pending_requests = json.loads(dcr["requests"]) + for req in pending_requests: + self.pending_requests.append(req) + + resource = json.loads(dcr["resource"]) + + groups = resource.get("groups") + if groups: + self._load_groups(groups) + + flavors = resource.get("flavors") + if flavors: + self._load_flavors(flavors) + + if len(self.flavors) == 0: + self.logger.warning("no flavors in db record") + + hosts = resource.get("hosts") + if hosts: + self._load_hosts(hosts) + + if len(self.hosts) == 0: + self.logger.warning("no hosts in db record") + + host_groups = resource.get("host_groups") + if host_groups: + self._load_host_groups(host_groups) + + if len(self.host_groups) == 0: + self.logger.warning("no host_groups (rack)") + + dc = resource.get("datacenter") + self._load_datacenter(dc) + + for ck in dc.get("children"): + if ck in self.host_groups.keys(): + self.datacenter.resources[ck] = self.host_groups[ck] + elif ck in self.hosts.keys(): + self.datacenter.resources[ck] = self.hosts[ck] + + hgs = resource.get("host_groups") + if hgs: + for hgk, hg in hgs.iteritems(): + host_group = self.host_groups[hgk] + + pk = hg.get("parent") + if pk == self.datacenter.name: + host_group.parent_resource = self.datacenter + elif pk in self.host_groups.keys(): + host_group.parent_resource = self.host_groups[pk] + + for ck in hg.get("children"): + if ck in self.hosts.keys(): + host_group.child_resources[ck] = self.hosts[ck] + elif ck in self.host_groups.keys(): + host_group.child_resources[ck] = self.host_groups[ck] + + hs = resource.get("hosts") + if hs: + for hk, h in hs.iteritems(): + host = self.hosts[hk] + + pk = h.get("parent") + if pk == self.datacenter.name: + host.host_group = self.datacenter + elif pk in self.host_groups.keys(): + host.host_group = self.host_groups[pk] + + for _, g in self.groups.iteritems(): + for hk in g.member_hosts.keys(): + if hk not in self.hosts.keys() and \ + hk not in self.host_groups.keys(): + del g.member_hosts[hk] + + self._update_compute_avail() + + return "ok" + + def _load_groups(self, _groups): + """Take JSON group data as defined in /resources/group and + create Group instance. + """ + + for gk, g in _groups.iteritems(): + group = Group(gk) + + group.status = g.get("status") + + group.uuid = g.get("uuid") + + group.group_type = g.get("group_type") + group.level = g.get("level") + group.factory = g.get("factory") + + rule_id = g.get("rule_id") + if rule_id != "none" and rule_id in self.group_rules.keys(): + group.rule = self.group_rules[rule_id] + + for mk, mv in g["metadata"].iteritems(): + group.metadata[mk] = mv + + for s_info in g["server_list"]: + group.server_list.append(s_info) + + for hk, server_list in g["member_hosts"].iteritems(): + group.member_hosts[hk] = [] + for s_info in server_list: + group.member_hosts[hk].append(s_info) + + self.groups[gk] = group + + def _load_flavors(self, _flavors): + """Take JSON flavor data as defined in /resources/flavor and + create Flavor instance. + """ + + for fk, f in _flavors.iteritems(): + flavor = Flavor(fk) + + flavor.status = f.get("status") + + flavor.flavor_id = f.get("flavor_id") + flavor.vCPUs = f.get("vCPUs") + flavor.mem_cap = f.get("mem") + flavor.disk_cap = f.get("disk") + for k, v in f["extra_specs"].iteritems(): + flavor.extra_specs[k] = v + + self.flavors[fk] = flavor + + def _load_hosts(self, _hosts): + """Take JSON host data as defined in /resources/host and + create Host instance. + """ + + for hk, h in _hosts.iteritems(): + host = Host(hk) + + host.status = h.get("status") + host.state = h.get("state") + + host.uuid = h.get("uuid") + + host.vCPUs = h.get("vCPUs") + host.original_vCPUs = h.get("original_vCPUs") + host.vCPUs_used = h.get("vCPUs_used") + host.avail_vCPUs = h.get("avail_vCPUs") + + host.mem_cap = h.get("mem") + host.original_mem_cap = h.get("original_mem") + host.free_mem_mb = h.get("free_mem_mb") + host.avail_mem_cap = h.get("avail_mem") + + host.local_disk_cap = h.get("local_disk") + host.original_local_disk_cap = h.get("original_local_disk") + host.free_disk_gb = h.get("free_disk_gb") + host.disk_available_least = h.get("disk_available_least") + host.avail_local_disk_cap = h.get("avail_local_disk") + + host.NUMA = NUMA(numa=h.get("NUMA")) + + for s_info in h["server_list"]: + host.server_list.append(s_info) + + for gk in h["membership_list"]: + if gk in self.groups.keys(): + host.memberships[gk] = self.groups[gk] + + # Not used by Valet currently, only capacity planning module + if "candidate_host_types" in h.keys(): + for htk, ht in h["candidate_host_types"].iteritems(): + host.candidate_host_types[htk] = ht + else: + host.candidate_host_types = {} + + self.hosts[hk] = host + + def _load_host_groups(self, _host_groups): + for hgk, hg in _host_groups.iteritems(): + host_group = HostGroup(hgk) + + host_group.status = hg.get("status") + + host_group.host_type = hg.get("host_type") + + host_group.vCPUs = hg.get("vCPUs") + host_group.avail_vCPUs = hg.get("avail_vCPUs") + + host_group.mem_cap = hg.get("mem") + host_group.avail_mem_cap = hg.get("avail_mem") + + host_group.local_disk_cap = hg.get("local_disk") + host_group.avail_local_disk_cap = hg.get("avail_local_disk") + + for s_info in hg["server_list"]: + host_group.server_list.append(s_info) + + for gk in hg.get("membership_list"): + if gk in self.groups.keys(): + host_group.memberships[gk] = self.groups[gk] + + self.host_groups[hgk] = host_group + + def _load_datacenter(self, _dc): + self.datacenter = Datacenter(_dc.get("name")) + + self.datacenter.status = _dc.get("status") + + self.datacenter.vCPUs = _dc.get("vCPUs") + self.datacenter.avail_vCPUs = _dc.get("avail_vCPUs") + + self.datacenter.mem_cap = _dc.get("mem") + self.datacenter.avail_mem_cap = _dc.get("avail_mem") + + self.datacenter.local_disk_cap = _dc.get("local_disk") + self.datacenter.avail_local_disk_cap = _dc.get("avail_local_disk") + + for s_info in _dc["server_list"]: + self.datacenter.server_list.append(s_info) + + for gk in _dc.get("membership_list"): + if gk in self.groups.keys(): + self.datacenter.memberships[gk] = self.groups[gk] + + def _update_compute_avail(self): + """Update amount of total available resources.""" + + self.CPU_avail = self.datacenter.avail_vCPUs + self.mem_avail = self.datacenter.avail_mem_cap + self.local_disk_avail = self.datacenter.avail_local_disk_cap + + def update_resource(self): + """Update resource status triggered by placements, events, and batch.""" + + for level in LEVEL: + for _, host_group in self.host_groups.iteritems(): + if host_group.host_type == level: + if host_group.is_available() and host_group.updated: + self._update_host_group(host_group) + + if self.datacenter.updated: + self._update_datacenter() + + self._update_compute_avail() + + def _update_host_group(self, _host_group): + """Update host group (rack) status.""" + + _host_group.init_resources() + del _host_group.server_list[:] + _host_group.init_memberships() + + for _, host in _host_group.child_resources.iteritems(): + if host.is_available(): + _host_group.vCPUs += host.vCPUs + _host_group.avail_vCPUs += host.avail_vCPUs + _host_group.mem_cap += host.mem_cap + _host_group.avail_mem_cap += host.avail_mem_cap + _host_group.local_disk_cap += host.local_disk_cap + _host_group.avail_local_disk_cap += host.avail_local_disk_cap + + for server_info in host.server_list: + _host_group.server_list.append(server_info) + + for gk in host.memberships.keys(): + _host_group.memberships[gk] = host.memberships[gk] + + def _update_datacenter(self): + """Update datacenter status.""" + + self.datacenter.init_resources() + del self.datacenter.server_list[:] + self.datacenter.memberships.clear() + + for _, resource in self.datacenter.resources.iteritems(): + if resource.is_available(): + self.datacenter.vCPUs += resource.vCPUs + self.datacenter.avail_vCPUs += resource.avail_vCPUs + self.datacenter.mem_cap += resource.mem_cap + self.datacenter.avail_mem_cap += resource.avail_mem_cap + self.datacenter.local_disk_cap += resource.local_disk_cap + self.datacenter.avail_local_disk_cap += resource.avail_local_disk_cap + + for s in resource.server_list: + self.datacenter.server_list.append(s) + + for gk in resource.memberships.keys(): + self.datacenter.memberships[gk] = resource.memberships[gk] + + def compute_resources(self, host): + """Compute the amount of resources with oversubsription ratios.""" + + ram_allocation_ratio_list = [] + cpu_allocation_ratio_list = [] + disk_allocation_ratio_list = [] + + for _, g in host.memberships.iteritems(): + if g.group_type == "aggr": + if g.name.startswith("valet:"): + metadata = g.metadata["prior_metadata"] + else: + metadata = g.metadata + + if "ram_allocation_ratio" in metadata.keys(): + if isinstance(metadata["ram_allocation_ratio"], list): + for r in metadata["ram_allocation_ratio"]: + ram_allocation_ratio_list.append(float(r)) + else: + ram_allocation_ratio_list.append(float(metadata["ram_allocation_ratio"])) + if "cpu_allocation_ratio" in metadata.keys(): + if isinstance(metadata["cpu_allocation_ratio"], list): + for r in metadata["cpu_allocation_ratio"]: + cpu_allocation_ratio_list.append(float(r)) + else: + cpu_allocation_ratio_list.append(float(metadata["cpu_allocation_ratio"])) + if "disk_allocation_ratio" in metadata.keys(): + if isinstance(metadata["disk_allocation_ratio"], list): + for r in metadata["disk_allocation_ratio"]: + disk_allocation_ratio_list.append(float(r)) + else: + disk_allocation_ratio_list.append(float(metadata["disk_allocation_ratio"])) + + ram_allocation_ratio = 1.0 + if len(ram_allocation_ratio_list) > 0: + ram_allocation_ratio = min(ram_allocation_ratio_list) + else: + if self.default_ram_allocation_ratio > 0: + ram_allocation_ratio = self.default_ram_allocation_ratio + + host.compute_mem(ram_allocation_ratio) + + cpu_allocation_ratio = 1.0 + if len(cpu_allocation_ratio_list) > 0: + cpu_allocation_ratio = min(cpu_allocation_ratio_list) + else: + if self.default_cpu_allocation_ratio > 0: + cpu_allocation_ratio = self.default_cpu_allocation_ratio + + host.compute_cpus(cpu_allocation_ratio) + + disk_allocation_ratio = 1.0 + if len(disk_allocation_ratio_list) > 0: + disk_allocation_ratio = min(disk_allocation_ratio_list) + else: + if self.default_disk_allocation_ratio > 0: + disk_allocation_ratio = self.default_disk_allocation_ratio + + host.compute_disk(disk_allocation_ratio) + + def compute_avail_resources(self, host): + """Compute available amount of resources after placements.""" + + status = host.compute_avail_mem() + if status != "ok": + self.logger.warning(status) + + status = host.compute_avail_cpus() + if status != "ok": + self.logger.warning(status) + + status = host.compute_avail_disk() + if status != "ok": + self.logger.warning(status) + + def mark_host_updated(self, _host_name): + """Mark the host updated.""" + + host = self.hosts[_host_name] + host.updated = True + + if host.host_group is not None: + if isinstance(host.host_group, HostGroup): + self.mark_host_group_updated(host.host_group.name) + else: + self.mark_datacenter_updated() + + def mark_host_group_updated(self, _name): + """Mark the host_group updated.""" + + host_group = self.host_groups[_name] + host_group.updated = True + + if host_group.parent_resource is not None: + if isinstance(host_group.parent_resource, HostGroup): + self.mark_host_group_updated(host_group.parent_resource.name) + else: + self.mark_datacenter_updated() + + def mark_datacenter_updated(self): + """Mark the datacenter updated.""" + + if self.datacenter is not None: + self.datacenter.updated = True + + def get_host_of_server(self, _s_info): + """Check and return host that hosts this server.""" + + host = None + + if len(self.change_of_placements) > 0: + if _s_info["stack_id"] != "none": + sid = _s_info["stack_id"] + ":" + _s_info["name"] + else: + sid = _s_info["uuid"] + + if sid in self.change_of_placements.keys(): + host_name = None + if "host" in self.change_of_placements[sid].keys(): + host_name = self.change_of_placements[sid]["host"] + elif "new_host" in self.change_of_placements[sid].keys(): + host_name = self.change_of_placements[sid]["new_host"] + + if host_name is not None: + host = self.hosts[host_name] + else: + for _, h in self.hosts.iteritems(): + if h.has_server(_s_info): + host = h + break + + return host + + def update_server_placements(self, change_of_placements=None, sync=False): + """Update hosts with the change of server placements. + + Update the available resources of host and NUMA if sync is True. + """ + + if change_of_placements is None: + change_of_placements = self.change_of_placements + + for _, change in change_of_placements.iteritems(): + if "new_host" in change and "old_host" in change: + # Migration case + + old_host = self.hosts[change.get("old_host")] + new_host = self.hosts[change.get("new_host")] + + s_info = change.get("info") + old_info = old_host.get_server_info(s_info) + + if sync: + # Adjust available remaining amount. + + old_flavor = self.get_flavor(old_info.get("flavor_id")) + new_flavor = self.get_flavor(s_info.get("flavor_id")) + + if new_flavor is None or old_flavor is None: + # NOTE(Gueyoung): ignore at this time. + # return False + pass + else: + s_info["vcpus"] = new_flavor.vCPUs + s_info["mem"] = new_flavor.mem_cap + s_info["disk"] = new_flavor.disk_cap + + new_host.deduct_avail_resources(s_info) + + if new_flavor.need_numa_alignment(): + cell = new_host.NUMA.deduct_server_resources(s_info) + s_info["numa"] = cell + + old_info["vcpus"] = old_flavor.vCPUs + old_info["mem"] = old_flavor.mem_cap + old_info["disk"] = old_flavor.disk_cap + + old_host.rollback_avail_resources(old_info) + + if old_flavor.need_numa_alignment(): + old_host.NUMA.rollback_server_resources(old_info) + + old_host.remove_server(old_info) + + new_host.add_server(old_info) + new_host.update_server(s_info) + + self.mark_host_updated(change.get("new_host")) + self.mark_host_updated(change.get("old_host")) + + elif "new_host" in change and "old_host" not in change: + # New server case + + host = self.hosts[change.get("new_host")] + s_info = change.get("info") + + flavor = self.get_flavor(s_info.get("flavor_id")) + + if flavor is None: + # NOTE(Gueyoung): ignore at this time. + # return False + pass + else: + s_info["vcpus"] = flavor.vCPUs + s_info["mem"] = flavor.mem_cap + s_info["disk"] = flavor.disk_cap + + host.deduct_avail_resources(s_info) + + host.add_server(s_info) + + if sync: + if flavor is not None: + # Adjust available remaining amount. + if flavor.need_numa_alignment(): + host.NUMA.deduct_server_resources(s_info) + else: + if s_info.get("numa") != "none": + host.NUMA.add_server(s_info) + + self.mark_host_updated(change.get("new_host")) + + elif "new_host" not in change and "old_host" in change: + # Deletion case + + host = self.hosts[change.get("old_host")] + s_info = change.get("info") + + flavor = self.get_flavor(s_info.get("flavor_id")) + + if flavor is None: + # NOTE(Gueyoung): ignore at this time. + # return False + pass + else: + s_info["vcpus"] = flavor.vCPUs + s_info["mem"] = flavor.mem_cap + s_info["disk"] = flavor.disk_cap + + host.rollback_avail_resources(s_info) + + if flavor.need_numa_alignment(): + host.NUMA.rollback_server_resources(s_info) + + host.remove_server(s_info) + + self.mark_host_updated(change.get("old_host")) + + else: + # Update case + + host = self.hosts[change.get("host")] + s_info = change.get("info") + + if sync: + # Adjust available remaining amount. + + old_info = host.get_server_info(s_info) + + if s_info["flavor_id"] != old_info["flavor_id"]: + old_flavor = self.get_flavor(old_info.get("flavor_id")) + new_flavor = self.get_flavor(s_info.get("flavor_id")) + + if old_flavor is None or new_flavor is None: + # NOTE(Gueyoung): ignore at this time. + # return False + pass + else: + host.rollback_avail_resources(old_info) + + if old_flavor.need_numa_alignment(): + host.NUMA.rollback_server_resources(old_info) + + s_info["vcpus"] = new_flavor.vCPUs + s_info["mem"] = new_flavor.mem_cap + s_info["disk"] = new_flavor.disk_cap + + host.deduct_avail_resources(s_info) + + if new_flavor.need_numa_alignment(): + cell = host.NUMA.deduct_server_resources(s_info) + s_info["numa"] = cell + + new_info = host.update_server(s_info) + + if new_info is not None: + self.mark_host_updated(change.get("host")) + + return True + + def update_server_grouping(self, change_of_placements=None, new_groups=None): + """Update group member_hosts and hosts' memberships + + Caused by server addition, deletion, and migration. + """ + + if change_of_placements is None: + change_of_placements = self.change_of_placements + + if new_groups is None: + new_groups = self._get_new_grouping() + + for _, placement in change_of_placements.iteritems(): + if "new_host" in placement.keys() and "old_host" in placement.keys(): + # Migrated server. This server can be unknown one previously. + + old_host = self.hosts[placement.get("old_host")] + new_host = self.hosts[placement.get("new_host")] + s_info = placement.get("info") + new_info = new_host.get_server_info(s_info) + + # A list of Valet groups + group_list = [] + self.get_groups_of_server(old_host, new_info, group_list) + + _group_list = self._get_groups_of_server(new_info, new_groups) + for gk in _group_list: + if gk not in group_list: + group_list.append(gk) + + self._remove_server_from_groups(old_host, new_info) + + self._add_server_to_groups(new_host, new_info, group_list) + + elif "new_host" in placement.keys() and "old_host" not in placement.keys(): + # New server + + new_host = self.hosts[placement.get("new_host")] + s_info = placement.get("info") + new_s_info = new_host.get_server_info(s_info) + + group_list = self._get_groups_of_server(new_s_info, new_groups) + + self._add_server_to_groups(new_host, new_s_info, group_list) + + elif "new_host" not in placement.keys() and "old_host" in placement.keys(): + # Deleted server. This server can be unknown one previously. + + # Enabled host + host = self.hosts[placement["old_host"]] + + self._remove_server_from_groups(host, placement.get("info")) + + else: + host_name = placement.get("host") + s_info = placement.get("info") + + if host_name in self.hosts.keys(): + host = self.hosts[host_name] + new_info = host.get_server_info(s_info) + + if new_info is not None: + self._update_server_in_groups(host, new_info) + + # To create, delete, and update dynamic Host-Aggregates. + # TODO(Gueyoung): return error if fail to connect to Nova. + self._manage_dynamic_host_aggregates() + + def _get_new_grouping(self, change_of_placements=None): + """Verify and get new hosts' memberships.""" + + if change_of_placements is None: + change_of_placements = self.change_of_placements + + new_groups = {} + + # TODO: grouping verification for 'new' servers. + # by calling verify_pre_valet_placements() + # Should add each host's new memberships. + + # Add host's memberships for server-group. + # Do not need to verify. + for _, placement in change_of_placements.iteritems(): + if "new_host" in placement.keys(): + host = self.hosts[placement.get("new_host")] + s_info = placement.get("info") + new_info = host.get_server_info(s_info) + + for gk, g in self.groups.iteritems(): + if g.factory == "server-group" and g.status == "enabled": + if g.has_server_uuid(new_info.get("uuid")): + if gk not in host.memberships.keys(): + host.memberships[gk] = g + self.mark_host_updated(host.name) + + if gk not in new_groups.keys(): + new_groups[gk] = [] + new_groups[gk].append(new_info) + + return new_groups + + def _get_groups_of_server(self, _s_info, new_groups): + """Check and return group list where server belongs to.""" + + group_list = [] + + _stack_id = _s_info.get("stack_id") + _stack_name = _s_info.get("stack_name") + _uuid = _s_info.get("uuid") + _name = _s_info.get("name") + + for gk, server_list in new_groups.iteritems(): + for s_info in server_list: + if s_info["uuid"] != "none": + if s_info["uuid"] == _uuid: + if gk not in group_list: + group_list.append(gk) + break + + if s_info["name"] != "none": + if s_info["stack_id"] != "none": + if s_info["stack_id"] == _stack_id and \ + s_info["name"] == _name: + if gk not in group_list: + group_list.append(gk) + break + + if s_info["stack_name"] != "none": + if s_info["stack_name"] == _stack_name and \ + s_info["name"] == _name: + if gk not in group_list: + group_list.append(gk) + break + + return group_list + + def get_groups_of_server(self, _host, _s_info, _group_list): + """Get groups where the server is assigned.""" + + for gk in _host.memberships.keys(): + if gk not in self.groups.keys() or self.groups[gk].status != "enabled": + del _host.memberships[gk] + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + continue + + g = self.groups[gk] + + if g.factory not in ("valet", "server-group"): + continue + + if isinstance(_host, HostGroup): + if g.level != _host.host_type: + continue + + if g.has_server_in_host(_host.name, _s_info): + if gk not in _group_list: + _group_list.append(gk) + + if isinstance(_host, Host) and _host.host_group is not None: + if _host.host_group.is_available(): + self.get_groups_of_server(_host.host_group, _s_info, _group_list) + elif isinstance(_host, HostGroup) and _host.parent_resource is not None: + if _host.parent_resource.is_available(): + if isinstance(_host.parent_resource, HostGroup): + self.get_groups_of_server(_host.parent_resource, _s_info, _group_list) + + def _add_server_to_groups(self, _host, _s_info, _groups): + """Add new server into groups.""" + + for gk in _groups: + # The group must be verified for host membership + if gk not in _host.memberships.keys(): + continue + + if gk not in self.groups.keys() or self.groups[gk].status != "enabled": + del _host.memberships[gk] + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + continue + + g = self.groups[gk] + + if g.factory not in ("valet", "server-group"): + continue + + if isinstance(_host, HostGroup): + if g.level != _host.host_type: + continue + + if g.factory == "server-group": + g.clean_server(_s_info["uuid"], _host.name) + + if g.add_server(_s_info, _host.name): + g.updated = True + else: + self.logger.warning("server already exists in group") + + if isinstance(_host, Host) and _host.host_group is not None: + if _host.host_group.is_available(): + self._add_server_to_groups(_host.host_group, _s_info, _groups) + elif isinstance(_host, HostGroup) and _host.parent_resource is not None: + if _host.parent_resource.is_available(): + if isinstance(_host.parent_resource, HostGroup): + self._add_server_to_groups(_host.parent_resource, _s_info, _groups) + + def _remove_server_from_groups(self, _host, _s_info): + """Remove server from related groups.""" + + for gk in _host.memberships.keys(): + if gk not in self.groups.keys() or self.groups[gk].status != "enabled": + del _host.memberships[gk] + + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + continue + + g = self.groups[gk] + + if g.factory not in ("valet", "server-group"): + continue + + if isinstance(_host, HostGroup): + if g.level != _host.host_type: + continue + + if g.remove_server(_s_info): + g.updated = True + + if g.remove_server_from_host(_host.name, _s_info): + g.updated = True + + # Remove host from group's membership if the host has no servers of the group. + if g.remove_member(_host.name): + g.updated = True + + # Remove group from host's membership if group does not have the host + # Not consider group has datacenter level. + if isinstance(_host, Host) or isinstance(_host, HostGroup): + if _host.remove_membership(g): + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + + if len(g.server_list) == 0: + g.status = "disabled" + g.updated = True + + if isinstance(_host, Host) and _host.host_group is not None: + if _host.host_group.is_available(): + self._remove_server_from_groups(_host.host_group, _s_info) + elif isinstance(_host, HostGroup) and _host.parent_resource is not None: + if _host.parent_resource.is_available(): + if isinstance(_host.parent_resource, HostGroup): + self._remove_server_from_groups(_host.parent_resource, _s_info) + + def _update_server_in_groups(self, _host, _s_info): + """Update server info in groups.""" + + for gk in _host.memberships.keys(): + if gk not in self.groups.keys() or self.groups[gk].status != "enabled": + del _host.memberships[gk] + if isinstance(_host, Host): + self.mark_host_updated(_host.name) + elif isinstance(_host, HostGroup): + self.mark_host_group_updated(_host.name) + else: + self.mark_datacenter_updated() + continue + + g = self.groups[gk] + + if g.factory not in ("valet", "server-group"): + continue + + if isinstance(_host, HostGroup): + if g.level != _host.host_type: + continue + + if g.update_server(_s_info): + g.update_server_in_host(_host.name, _s_info) + g.updated = True + + if isinstance(_host, Host) and _host.host_group is not None: + if _host.host_group.is_available(): + self._update_server_in_groups(_host.host_group, _s_info) + elif isinstance(_host, HostGroup) and _host.parent_resource is not None: + if _host.parent_resource.is_available(): + if isinstance(_host.parent_resource, HostGroup): + self._update_server_in_groups(_host.parent_resource, _s_info) + + def add_group(self, _g_name, _g_type, _level, _factory, _host_name): + """Add/Enable group unless the group exists or disabled.""" + + if _g_name not in self.groups.keys(): + group = Group(_g_name) + group.group_type = _g_type + group.factory = _factory + group.level = _level + group.rule = self._get_rule_of_group(_g_name) + group.new = True + group.updated = True + self.groups[_g_name] = group + elif self.groups[_g_name].status != "enabled": + self.groups[_g_name].status = "enabled" + self.groups[_g_name].updated = True + + if _host_name in self.hosts.keys(): + host = self.hosts[_host_name] + else: + host = self.host_groups[_host_name] + + # Update host memberships. + if host is not None: + if _g_name not in host.memberships.keys(): + host.memberships[_g_name] = self.groups[_g_name] + + if isinstance(host, Host): + self.mark_host_updated(_host_name) + elif isinstance(host, HostGroup): + self.mark_host_group_updated(_host_name) + + return True + + def _get_rule_of_group(self, _gk): + """Get valet group rule of the given group.""" + + rule_name_elements = _gk.split(':') + rule_name = rule_name_elements[len(rule_name_elements)-1] + + if rule_name in self.group_rules.keys(): + return self.group_rules[rule_name] + + return None + + def get_group_by_uuid(self, _uuid): + """Check and get the group with its uuid.""" + + for _, g in self.groups.iteritems(): + if g.uuid == _uuid: + return g + + return None + + def check_valid_rules(self, _tenant_id, _rule_list, use_ex=True): + """Check if given rules are valid to be used.""" + + for rk in _rule_list: + if rk not in self.group_rules.keys(): + return "not exist rule (" + rk + ")" + + # TODO(Gueyoung): if disabled, + # what to do with placed servers under this rule? + if self.group_rules[rk].status != "enabled": + return "rule (" + rk + ") is not enabled" + + if not use_ex: + if self.group_rules[rk].rule_type == "exclusivity": + return "exclusivity not supported" + + rule = self.group_rules[rk] + if len(rule.members) > 0 and _tenant_id not in rule.members: + return "no valid tenant to use rule (" + rk + ")" + + return "ok" + + def _manage_dynamic_host_aggregates(self): + """Create, delete, or update Host-Aggregates after placement decisions.""" + + for gk in self.groups.keys(): + g = self.groups[gk] + if g.group_type == "exclusivity" and g.status == "enabled": + aggr_name = "valet:" + g.name + if aggr_name not in self.groups.keys(): + # Create Host-Aggregate. + status = self._add_exclusivity_aggregate(aggr_name, g) + # TODO(Gueyoung): return error + if status != "ok": + self.logger.warning("error while adding dynamic host-aggregate") + else: + dha = self.groups[aggr_name] + for hk in g.member_hosts.keys(): + if hk not in dha.member_hosts.keys(): + # Add new host into Host-Aggregate. + status = self._update_exclusivity_aggregate(dha, + self.hosts[hk]) + # TODO(Gueyoung): return error + if status != "ok": + self.logger.warning("error while updating dynamic host-aggregate") + + for gk in self.groups.keys(): + g = self.groups[gk] + if g.group_type == "aggr" and g.status == "enabled": + if g.name.startswith("valet:"): + if g.metadata["valet_type"] == "exclusivity": + name_elements = g.name.split(':', 1) + ex_group_name = name_elements[1] + if ex_group_name not in self.groups.keys() or \ + self.groups[ex_group_name].status != "enabled": + # Delete Host-Aggregate + status = self._remove_exclusivity_aggregate(g) + # TODO(Gueyoung): return error + if status != "ok": + self.logger.warning("error while removing dynamic host-aggregate") + else: + ex_group = self.groups[ex_group_name] + for hk in g.member_hosts.keys(): + if hk not in ex_group.member_hosts.keys(): + # Remove host from Host-Aggregate. + status = self._remove_host_from_exclusivity_aggregate(g, + self.hosts[hk]) + + # TODO(Gueyoung): return error + if status != "ok": + self.logger.warning("error while removing host from dynamic host-aggregate") + + def _add_exclusivity_aggregate(self, _name, _group): + """Create platform Host-Aggregate for Valet rules. + + Exclusivity: create Host-Aggregate, and lock. + """ + + group = Group(_name) + group.group_type = "aggr" + group.level = "host" + group.factory = "nova" + + metadata = {"valet_type": "exclusivity"} + + new_host_list = [] + ex_metadata = {} + + for hk in _group.member_hosts.keys(): + host = self.hosts[hk] + aggregates = host.get_aggregates() + + old_aggregates = [] + for a in aggregates: + if a.name.startswith("valet:"): + continue + + for mk, mv in a.metadata.iteritems(): + if mk not in ex_metadata.keys(): + ex_metadata[mk] = mv + else: + if isinstance(ex_metadata[mk], list): + if mv not in ex_metadata[mk]: + ex_metadata[mk].append(mv) + self.logger.warning("multiple values of metadata key") + else: + if mv != ex_metadata[mk]: + value_list = [ex_metadata[mk], mv] + ex_metadata[mk] = value_list + self.logger.warning("multiple values of metadata key") + + old_aggregates.append(a) + + if hk in a.member_hosts.keys(): + del a.member_hosts[hk] + a.updated = True + + if a.name in host.memberships.keys(): + del host.memberships[a.name] + + if len(old_aggregates) > 0: + metadata[hk] = str(old_aggregates[0].uuid) + for i in range(1, len(old_aggregates)): + metadata[hk] += ("," + str(old_aggregates[i].uuid)) + + new_host_list.append(host) + + metadata["prior_metadata"] = ex_metadata + + group.metadata = metadata + + for host in new_host_list: + group.member_hosts[host.name] = [] + + host.memberships[_name] = group + self.mark_host_updated(host.name) + + group.updated = True + + if not self.metadata.source.valid_client(self.datacenter_url): + self.metadata.source.set_client(self.datacenter_url) + + status = self.metadata.create_exclusive_aggregate(group, + new_host_list) + + self.groups[_name] = group + + return status + + def _update_exclusivity_aggregate(self, _group, _host): + """Update platform Host-Aggregate for Valet rules. + + Exclusivity: update Host-Aggregate, and lock. + """ + + status = "ok" + + aggregates = _host.get_aggregates() + + if _group.group_type == "aggr": + if _host.name not in _group.member_hosts.keys(): + old_aggregates = [] + ex_metadata = _group.metadata["prior_metadata"] + + for a in aggregates: + if a.name.startswith("valet:"): + continue + + for mk, mv in a.metadata.iteritems(): + if mk not in ex_metadata.keys(): + ex_metadata[mk] = mv + else: + if isinstance(ex_metadata[mk], list): + if mv not in ex_metadata[mk]: + ex_metadata[mk].append(mv) + self.logger.warning("multiple values of metadata key") + else: + if mv != ex_metadata[mk]: + value_list = [ex_metadata[mk], mv] + ex_metadata[mk] = value_list + self.logger.warning("multiple values of metadata key") + + old_aggregates.append(a) + + if _host.name in a.member_hosts.keys(): + del a.member_hosts[_host.name] + a.updated = True + + if a.name in _host.memberships.keys(): + del _host.memberships[a.name] + + if len(old_aggregates) > 0: + _group.metadata[_host.name] = str(old_aggregates[0].uuid) + for i in range(1, len(old_aggregates)): + _group.metadata[_host.name] += ("," + str(old_aggregates[i].uuid)) + + _group.metadata["prior_metadata"] = ex_metadata + + _group.member_hosts[_host.name] = [] + _group.updated = True + + _host.memberships[_group.name] = _group + self.mark_host_updated(_host.name) + + if not self.metadata.source.valid_client(self.datacenter_url): + self.metadata.source.set_client(self.datacenter_url) + + status = self.metadata.update_exclusive_aggregate(_group.uuid, + _group.metadata, + _host.name, + old_aggregates) + + return status + + def _remove_exclusivity_aggregate(self, _group): + """Remove dynamic Host-Aggregate.""" + + for hk in _group.member_hosts.keys(): + host = self.hosts[hk] + + status = self._remove_host_from_exclusivity_aggregate(_group, host) + if status != "ok": + self.logger.warning("error while removing host from dynamic host-aggregate") + + del self.groups[_group.name] + + if not self.metadata.source.valid_client(self.datacenter_url): + self.metadata.source.set_client(self.datacenter_url) + + return self.metadata.remove_exclusive_aggregate(_group.uuid) + + def _remove_host_from_exclusivity_aggregate(self, _group, _host): + """Update platform Host-Aggregate for Valet rules. + + Exclusivity: delete host from dynamic Host-Aggregate. + """ + + status = "ok" + + if _group.group_type == "aggr": + if _host.name in _group.member_hosts.keys(): + old_aggregates = [] + if _host.name in _group.metadata.keys(): + aggr_ids = _group.metadata[_host.name].split(',') + + for aid in aggr_ids: + aggr = self.get_group_by_uuid(int(aid)) + if aggr is not None: + aggr.member_hosts[_host.name] = [] + aggr.updated = True + old_aggregates.append(aggr) + + if aggr.name not in _host.memberships.keys(): + _host.memberships[aggr.name] = aggr + + _group.metadata[_host.name] = "" + + del _group.member_hosts[_host.name] + _group.updated = True + + del _host.memberships[_group.name] + self.mark_host_updated(_host.name) + + if not self.metadata.source.valid_client(self.datacenter_url): + self.metadata.source.set_client(self.datacenter_url) + + status = self.metadata.remove_host_from_exclusive_aggregate(_group.uuid, + _group.metadata, + _host.name, + old_aggregates) + + return status + + def sync_with_platform(self, store=False): + """Communicate with platform (e.g., nova) to get resource status. + + Due to dependencies between resource types, + keep the following order of process. + """ + + if len(self.pending_requests) > 0: + return True + + self.logger.info("load data from platform (e.g., nova)") + + # Set the platorm client lib (e.g., novaclient). + if not self.metadata.source.valid_client(self.datacenter_url): + count = 0 + while count < 3: + if not self.metadata.source.set_client(self.datacenter_url): + self.logger.warning("fail to set novaclient: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to set novaclient") + return False + + count = 0 + while count < 3: + # Set each flavor and its metadata. + if not self.metadata.get_flavors(self): + self.logger.warning("fail to get flavors: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to get flavors") + return False + + count = 0 + while count < 3: + # Set each compute host and servers information. + if not self.compute.get_hosts(self): + self.logger.warning("fail to get hosts: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to get hosts") + return False + + # TODO(Gueyoung): need to every time? + # Set the layout between each compute host and rack. + if not self.topology.get_topology(self): + return False + + count = 0 + while count < 3: + # Set the availability-zone, host-aggregate, and server-group + # of each compute host. + if not self.metadata.get_groups(self): + self.logger.warning("fail to get groups: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to get groups") + return False + + # Update total capacities of each host. + # Triggered by overcommit ratio update or newly added. + for _, host in self.hosts.iteritems(): + if host.is_available() and host.updated: + self.compute_resources(host) + + # Update server placements in hosts + # If sync is True, update the available capacities. + if not self.update_server_placements(sync=True): + return False + + # Update the available capacities of each NUMA and host. + # Triggered by unknown server additions and deletions. + for _, host in self.hosts.iteritems(): + if host.is_available() and host.updated: + self.compute_avail_resources(host) + + # Update server grouping changed by deletion and migration of servers. + # TODO(Gueyoung): return False if fail to connect to Nova. + self.update_server_grouping() + + # Update racks (and clusters) and datacenter based on host change. + self.update_resource() + + # TODO: If peoridic batches to collect data from platform is activated, + # should check if there is any update before storing data into DB. + if store: + self.store_resource() + + return True + + def get_flavor(self, _id): + """Get a flavor info.""" + + if isinstance(_id, six.string_types): + flavor_id = _id + else: + flavor_id = str(_id) + + self.logger.debug("fetching flavor = " + flavor_id) + + flavor = None + if flavor_id in self.flavors.keys(): + flavor = self.flavors[flavor_id] + else: + for _, f in self.flavors.iteritems(): + if f.flavor_id == flavor_id: + flavor = f + break + + if flavor is not None: + # Check if detailed information. + # TODO(Gueyoung): what if flavor specs changed from platform? + if flavor.vCPUs == 0: + if not self.metadata.source.valid_client(self.datacenter_url): + count = 0 + while count < 3: + if not self.metadata.source.set_client(self.datacenter_url): + self.logger.warning("fail to set novaclient: try again") + count += 1 + time.sleep(1) + else: + break + if count == 3: + self.logger.error("fail to set novaclient") + return None + + f = self.metadata.source.get_flavor(flavor.flavor_id) + if f is None: + flavor = None + else: + flavor.set_info(f) + flavor.updated = True + + self.logger.debug("flavor (" + flavor.name + ") fetched") + else: + self.logger.warning("unknown flavor = " + flavor_id) + + return flavor + + def store_resource(self, opt=None, req_id=None): + """Store resource status into DB.""" + + flavor_updates = {} + group_updates = {} + host_updates = {} + host_group_updates = {} + + # Do not store disbaled resources. + + for fk, flavor in self.flavors.iteritems(): + # TODO(Gueyoung): store disabled flavor? + flavor_updates[fk] = flavor.get_json_info() + + for gk, group in self.groups.iteritems(): + if group.status == "enabled": + if group.factory != "valet": + group_updates[gk] = group.get_json_info() + + for hk, host in self.hosts.iteritems(): + if host.is_available(): + host_updates[hk] = host.get_json_info() + + for hgk, host_group in self.host_groups.iteritems(): + if host_group.is_available(): + host_group_updates[hgk] = host_group.get_json_info() + + datacenter_update = self.datacenter.get_json_info() + + # If there is pending requests (i.e., not confirmed nor rollbacked), + # do NOT sync with platform when dealing with new request. + # Here, add/remove request from/to pending list + # to track the list of pending requests. + if opt is not None and req_id is not None: + if opt in ("create", "delete", "update"): + self.pending_requests.append(req_id) + elif opt in ("confirm", "rollback"): + for rid in self.pending_requests: + if rid == req_id: + self.pending_requests.remove(rid) + break + + json_update = {'flavors': flavor_updates, 'groups': group_updates, 'hosts': host_updates, + 'host_groups': host_group_updates, 'datacenter': datacenter_update} + + if self.new: + if not self.dbh.create_resource(self.datacenter_id, + self.datacenter_url, + self.pending_requests, + json_update): + return False + else: + if not self.dbh.update_resource(self.datacenter_id, + self.datacenter_url, + self.pending_requests, + json_update): + return False + + if self.new: + self.logger.debug("new datacenter = " + self.datacenter_id) + self.logger.debug(" url = " + self.datacenter_url) + else: + self.logger.debug("updated datacenter = " + self.datacenter_id) + self.logger.debug(" url = " + self.datacenter_url) + self.logger.debug("region = " + json.dumps(json_update['datacenter'], indent=4)) + self.logger.debug("racks = " + json.dumps(json_update['host_groups'], indent=4)) + self.logger.debug("hosts = " + json.dumps(json_update['hosts'], indent=4)) + self.logger.debug("groups = " + json.dumps(json_update['groups'], indent=4)) + self.logger.debug("flavors = ") + for fk, f_info in json_update['flavors'].iteritems(): + if f_info["vCPUs"] > 0: + self.logger.debug(json.dumps(f_info, indent=4)) + + updated_valet_groups = {} + new_valet_groups = {} + deleted_valet_groups = {} + for gk, group in self.groups.iteritems(): + if group.status == "enabled": + if group.factory == "valet": + if group.new: + new_valet_groups[gk] = group.get_json_info() + elif group.updated: + updated_valet_groups[gk] = group.get_json_info() + else: + if group.factory == "valet": + deleted_valet_groups[gk] = group.get_json_info() + + for gk, g_info in new_valet_groups.iteritems(): + if not self.dbh.create_valet_group(gk, g_info): + return False + + self.logger.debug("new valet group = " + gk) + self.logger.debug("info = " + json.dumps(g_info, indent=4)) + + for gk, g_info in updated_valet_groups.iteritems(): + if not self.dbh.update_valet_group(gk, g_info): + return False + + self.logger.debug("updated valet group = " + gk) + self.logger.debug("info = " + json.dumps(g_info, indent=4)) + + for gk, g_info in deleted_valet_groups.iteritems(): + if not self.dbh.delete_valet_group(gk): + return False + + self.logger.debug("deleted valet group = " + gk) + self.logger.debug("info = " + json.dumps(g_info, indent=4)) + + return True diff --git a/engine/src/valet/engine/resource_manager/resource_handler.py b/engine/src/valet/engine/resource_manager/resource_handler.py new file mode 100644 index 0000000..38868c7 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resource_handler.py @@ -0,0 +1,299 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import json + +from valet.engine.resource_manager.resource import Resource +from valet.engine.resource_manager.resources.group_rule import GroupRule +from valet.engine.resource_manager.resources.host_group import HostGroup + + +class ResourceHandler: + """Handler for dealing with all existing datacenters and their resources.""" + + def __init__(self, _tid, _dbh, _compute, _metadata, _topology, + _config, _logger): + self.end_of_process = False + + self.dbh = _dbh + + self.compute = _compute + self.metadata = _metadata + self.topology = _topology + + self.default_cpu_allocation_ratio = _config.get("default_cpu_allocation_ratio") + self.default_ram_allocation_ratio = _config.get("default_ram_allocation_ratio") + self.default_disk_allocation_ratio = _config.get("default_disk_allocation_ratio") + self.batch_sync_interval = _config.get("batch_sync_interval") + + self.group_rules = {} + self.resource_list = [] + + self.logger = _logger + + def load_group_rules_from_db(self): + """Get all defined valet group rules from DB. + + Note that rules are applied to all datacenters. + """ + + # Init first + self.group_rules = {} + + rule_list = self.dbh.get_group_rules() + if rule_list is None: + return None + + for r in rule_list: + rule = GroupRule(r.get("id")) + + rule.status = r.get("status") + + rule.app_scope = r.get("app_scope") + rule.rule_type = r.get("type") + rule.level = r.get("level") + rule.members = json.loads(r.get("members")) + rule.desc = r.get("description") + + self.group_rules[rule.rule_id] = rule + + return "ok" + + def load_group_rule_from_db(self, _id): + """Get valet group rule from DB.""" + + # Init first + self.group_rules = {} + + r = self.dbh.get_group_rule(_id) + if r is None: + return None + elif len(r) == 0: + return "rule not found" + + rule = GroupRule(r.get("id")) + + rule.status = r.get("status") + + rule.app_scope = r.get("app_scope") + rule.rule_type = r.get("type") + rule.level = r.get("level") + rule.members = json.loads(r.get("members")) + rule.desc = r.get("description") + + self.group_rules[rule.rule_id] = rule + + return "ok" + + def create_group_rule(self, _name, _scope, _type, _level, _members, _desc): + """Create a new group rule in DB.""" + + r = self.dbh.get_group_rule(_name) + if r is None: + return None + elif len(r) > 0: + return "rule already exists" + + if not self.dbh.create_group_rule(_name, _scope, _type, _level, + _members, _desc): + return None + + return "ok" + + def get_rules(self): + """Return basic info of valet rules.""" + + rule_list = [] + + valet_group_list = self.dbh.get_valet_groups() + if valet_group_list is None: + return None + + for rk, rule in self.group_rules.iteritems(): + rule_info = self._get_rule(rule) + + for vg in valet_group_list: + if vg["rule_id"] == rk: + gk = vg.get("id") + gk_elements = gk.split(":") + dc_id = gk_elements[0] + + if dc_id not in rule_info["regions"]: + rule_info["regions"].append(dc_id) + + rule_list.append(rule_info) + + return rule_list + + def _get_rule(self, _rule): + """Return rule info.""" + + rule_info = {} + + rule_info["id"] = _rule.rule_id + rule_info["type"] = _rule.rule_type + rule_info["app_scope"] = _rule.app_scope + rule_info["level"] = _rule.level + rule_info["members"] = _rule.members + rule_info["description"] = _rule.desc + rule_info["status"] = _rule.status + rule_info["regions"] = [] + + return rule_info + + def get_placements_under_rule(self, _rule_name, _resource): + """Get server placements info under given rule in datacenter.""" + + placements = {} + + rule = self.group_rules[_rule_name] + + for gk, g in _resource.groups.iteritems(): + if g.factory == "valet": + if g.rule.rule_id == _rule_name: + placements[gk] = self._get_placements(g, _resource) + + result = {} + result["id"] = rule.rule_id + result["type"] = rule.rule_type + result["app_scope"] = rule.app_scope + result["level"] = rule.level + result["members"] = rule.members + result["description"] = rule.desc + result["status"] = rule.status + result["placements"] = placements + + return result + + def _get_placements(self, _g, _resource): + """Get placement info of servers in group.""" + + placements = {} + + for hk, server_list in _g.member_hosts.iteritems(): + for s_info in server_list: + sid = s_info.get("stack_name") + ":" + s_info.get("name") + placements[sid] = {} + placements[sid]["region"] = _resource.datacenter_id + + if hk in _resource.hosts.keys(): + host = _resource.hosts[hk] + + placements[sid]["host"] = host.name + + hg = host.host_group + if isinstance(hg, HostGroup) and hg.host_type == "rack": + placements[sid]["rack"] = hg.name + else: + placements[sid]["rack"] = "na" + + az = host.get_availability_zone() + az_name_elements = az.name.split(':', 1) + if len(az_name_elements) > 1: + az_name = az_name_elements[1] + else: + az_name = az.name + placements[sid]["availability-zone"] = az_name + + elif hk in _resource.host_groups.keys(): + hg = _resource.host_groups[hk] + + if hg.host_type == "rack": + placements[sid]["rack"] = hg.name + + for hhk, host in hg.child_resources.iteritems(): + if host.has_server(s_info): + placements[sid]["host"] = host.name + + az = host.get_availability_zone() + az_name_elements = az.name.split(':', 1) + if len(az_name_elements) > 1: + az_name = az_name_elements[1] + else: + az_name = az.name + placements[sid]["availability-zone"] = az_name + + break + else: + # TODO(Gueyoung): Look for az, rack and host + placements[sid]["availability-zone"] = "na" + placements[sid]["rack"] = "na" + placements[sid]["host"] = "na" + + else: + placements[sid]["availability-zone"] = "na" + placements[sid]["rack"] = "na" + placements[sid]["host"] = "na" + + return placements + + def load_resource(self, _datacenter): + """Create a resource for placement decisions + + in a given target datacenter. + """ + + # Init first + del self.resource_list[:] + + resource = Resource(_datacenter, self.dbh, + self.compute, self.metadata, self.topology, + self.logger) + + resource.set_config(self.default_cpu_allocation_ratio, + self.default_ram_allocation_ratio, + self.default_disk_allocation_ratio) + + resource.set_group_rules(self.group_rules) + + status = resource.load_resource_from_db() + if status is None: + return False + elif status != "ok": + self.logger.warning(status) + resource.new = True + + self.resource_list.append(resource) + + return True + + def load_resource_with_rule(self, _datacenter): + """Create and return a resource with valet group rule.""" + + # Init first + del self.resource_list[:] + + resource = Resource(_datacenter, self.dbh, + self.compute, self.metadata, self.topology, + self.logger) + + resource.set_config(self.default_cpu_allocation_ratio, + self.default_ram_allocation_ratio, + self.default_disk_allocation_ratio) + + resource.set_group_rules(self.group_rules) + + status = resource.load_resource_from_db() + if status is None: + return None + elif status != "ok": + return status + + self.resource_list.append(resource) + + return "ok" diff --git a/engine/src/valet/engine/resource_manager/resources/__init__.py b/engine/src/valet/engine/resource_manager/resources/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resources/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/resource_manager/resources/datacenter.py b/engine/src/valet/engine/resource_manager/resources/datacenter.py new file mode 100644 index 0000000..6f03bae --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resources/datacenter.py @@ -0,0 +1,85 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +class Datacenter(object): + """Container for datacenter resource.""" + + def __init__(self, _name): + self.name = _name + + self.status = "enabled" + + # Enabled group objects (e.g., aggregate) + self.memberships = {} + + self.vCPUs = 0 + self.avail_vCPUs = 0 + + self.mem_cap = 0 # MB + self.avail_mem_cap = 0 + + self.local_disk_cap = 0 # GB, ephemeral + self.avail_local_disk_cap = 0 + + # Enabled host_group (rack) or host objects + self.resources = {} + + # A list of placed servers + self.server_list = [] + + self.updated = False + + def is_available(self): + """Check if host is available.""" + + if self.status == "enabled": + return True + else: + return False + + def init_resources(self): + self.vCPUs = 0 + self.avail_vCPUs = 0 + self.mem_cap = 0 + self.avail_mem_cap = 0 + self.local_disk_cap = 0 + self.avail_local_disk_cap = 0 + + def get_json_info(self): + membership_list = [] + for gk in self.memberships.keys(): + membership_list.append(gk) + + child_list = [] + for ck in self.resources.keys(): + child_list.append(ck) + + return {'status': self.status, + 'name': self.name, + 'membership_list': membership_list, + 'vCPUs': self.vCPUs, + 'avail_vCPUs': self.avail_vCPUs, + 'mem': self.mem_cap, + 'avail_mem': self.avail_mem_cap, + 'local_disk': self.local_disk_cap, + 'avail_local_disk': self.avail_local_disk_cap, + 'children': child_list, + 'server_list': self.server_list} diff --git a/engine/src/valet/engine/resource_manager/resources/flavor.py b/engine/src/valet/engine/resource_manager/resources/flavor.py new file mode 100644 index 0000000..ef8b13e --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resources/flavor.py @@ -0,0 +1,67 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import six + + +class Flavor(object): + """Container for flavor resource.""" + + def __init__(self, _name): + self.name = _name + + self.flavor_id = None + + self.status = "enabled" + + self.vCPUs = 0 + self.mem_cap = 0 # MB + self.disk_cap = 0 # including ephemeral (GB) and swap (MB) + + self.extra_specs = {} + + self.updated = False + + def set_info(self, _f): + """Copy detailed flavor information.""" + + self.status = _f.status + + self.vCPUs = _f.vCPUs + self.mem_cap = _f.mem_cap + self.disk_cap = _f.disk_cap + + for ek, ev in _f.extra_specs.iteritems(): + self.extra_specs[ek] = ev + + def need_numa_alignment(self): + """Check if this flavor requires NUMA alignment.""" + + for key, req in six.iteritems(self.extra_specs): + if key == "hw:numa_nodes" and int(req) == 1: + return True + + return False + + def get_json_info(self): + return {'status': self.status, + 'flavor_id': self.flavor_id, + 'vCPUs': self.vCPUs, + 'mem': self.mem_cap, + 'disk': self.disk_cap, + 'extra_specs': self.extra_specs} diff --git a/engine/src/valet/engine/resource_manager/resources/group.py b/engine/src/valet/engine/resource_manager/resources/group.py new file mode 100644 index 0000000..eef9771 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resources/group.py @@ -0,0 +1,401 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +class Group(object): + """Container for groups.""" + + def __init__(self, _name): + """Define logical group of compute hosts.""" + + self.name = _name + + self.uuid = None + + # Group includes + # - host-aggregate, availability-zone, + # - server groups: affinity, diversity, soft-affinity, soft-diversity, + # - affinity, diversity, quorum-diversity, exclusivity + self.group_type = None + + self.level = None + + # Where the group is originated + # - 'valet' or 'nova' or 'server-group' or other cloud platform + self.factory = None + + self.status = "enabled" + + # A list of host_names and their placed servers + # Value is a list of server infos. + self.member_hosts = {} + + # For Host-Aggregate group + self.metadata = {} + + # Group rule object for valet groups + self.rule = None + + # A list of placed servers (e.g., VMs) + # Value is a list of server infos. + self.server_list = [] + + self.updated = False + + self.new = False + + def has_server(self, _s_info): + """Check if the server exists in this group.""" + + for s_info in self.server_list: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + return True + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + return True + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + return True + + return False + + def has_server_uuid(self, _uuid): + """Check if the server exists in this group with uuid.""" + + for s_info in self.server_list: + if s_info["uuid"] == _uuid: + return True + + return False + + def has_server_in_host(self, _host_name, _s_info): + """Check if the server exists in the host in this group.""" + + if _host_name in self.member_hosts.keys(): + server_list = self.member_hosts[_host_name] + + for s_info in server_list: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + return True + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + return True + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + return True + + return False + + def get_server_info(self, _s_info): + """Get server info.""" + + for s_info in self.server_list: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + return s_info + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + return s_info + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + return s_info + + return None + + def get_server_info_in_host(self, _host_name, _s_info): + """Get server info.""" + + if _host_name in self.member_hosts.keys(): + server_list = self.member_hosts[_host_name] + + for s_info in server_list: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + return s_info + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + return s_info + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + return s_info + + return None + + def add_server(self, _s_info, _host_name): + """Add server to this group.""" + + if self.has_server(_s_info): + return False + + if self.has_server_in_host(_host_name, _s_info): + return False + + self.server_list.append(_s_info) + + if self.factory in ("valet", "server-group"): + if _host_name not in self.member_hosts.keys(): + self.member_hosts[_host_name] = [] + + self.member_hosts[_host_name].append(_s_info) + + return True + + def remove_server(self, _s_info): + """Remove server from this group's server_list.""" + + for s_info in self.server_list: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + self.server_list.remove(s_info) + return True + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + self.server_list.remove(s_info) + return True + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + self.server_list.remove(s_info) + return True + + return False + + def remove_server_from_host(self, _host_name, _s_info): + """Remove server from the host of this group.""" + + if _host_name in self.member_hosts.keys(): + for s_info in self.member_hosts[_host_name]: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + self.member_hosts[_host_name].remove(s_info) + return True + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + self.member_hosts[_host_name].remove(s_info) + return True + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + self.member_hosts[_host_name].remove(s_info) + return True + + return False + + def remove_member(self, _host_name): + """Remove the host from this group's memberships if it is empty. + + To return the host to pool for other placements. + """ + + if self.factory in ("valet", "server-group"): + if _host_name in self.member_hosts.keys() and \ + len(self.member_hosts[_host_name]) == 0: + del self.member_hosts[_host_name] + + return True + + return False + + def clean_server(self, _uuid, _host_name): + """Clean the server that does not have enriched info.""" + + if _uuid == "none": + return + + for s_info in self.server_list: + if s_info["uuid"] == _uuid and s_info["name"] == "none": + self.server_list.remove(s_info) + break + + if _host_name in self.member_hosts.keys(): + for s_info in self.member_hosts[_host_name]: + if s_info["uuid"] == _uuid and s_info["name"] == "none": + self.member_hosts[_host_name].remove(s_info) + break + + if _host_name in self.member_hosts.keys() and \ + len(self.member_hosts[_host_name]) == 0: + del self.member_hosts[_host_name] + + def update_server(self, _s_info): + """Update server with info from given info. + + The info comes from platform or request (e.g., Heat stack). + """ + + updated = False + + s_info = self.get_server_info(_s_info) + + if s_info is not None: + if _s_info["stack_id"] != "none" and \ + _s_info["stack_id"] != s_info["stack_id"]: + s_info["stack_id"] = _s_info["stack_id"] + updated = True + + if _s_info["uuid"] != "none" and \ + _s_info["uuid"] != s_info["uuid"]: + s_info["uuid"] = _s_info["uuid"] + updated = True + + if _s_info["flavor_id"] != "none" and \ + _s_info["flavor_id"] != s_info["flavor_id"]: + s_info["flavor_id"] = _s_info["flavor_id"] + updated = True + + if _s_info["vcpus"] != -1 and \ + _s_info["vcpus"] != s_info["vcpus"]: + s_info["vcpus"] = _s_info["vcpus"] + updated = True + + if _s_info["mem"] != -1 and \ + _s_info["mem"] != s_info["mem"]: + s_info["mem"] = _s_info["mem"] + updated = True + + if _s_info["disk"] != -1 and \ + _s_info["disk"] != s_info["disk"]: + s_info["disk"] = _s_info["disk"] + updated = True + + if _s_info["image_id"] != "none" and \ + _s_info["image_id"] != s_info["image_id"]: + s_info["image_id"] = _s_info["image_id"] + updated = True + + if _s_info["state"] != "none" and \ + _s_info["state"] != s_info["state"]: + s_info["state"] = _s_info["state"] + updated = True + + if _s_info["status"] != "none" and \ + _s_info["status"] != s_info["status"]: + s_info["status"] = _s_info["status"] + updated = True + + if _s_info["numa"] != "none" and \ + _s_info["numa"] != s_info["numa"]: + s_info["numa"] = _s_info["numa"] + updated = True + + return updated + + def update_server_in_host(self, _host_name, _s_info): + """Updateserver in the host of this group.""" + + if _host_name in self.member_hosts.keys(): + s_info = self.get_server_info_in_host(_host_name, _s_info) + + if s_info is not None: + if _s_info["stack_id"] != "none" and \ + _s_info["stack_id"] != s_info["stack_id"]: + s_info["stack_id"] = _s_info["stack_id"] + + if _s_info["uuid"] != "none" and \ + _s_info["uuid"] != s_info["uuid"]: + s_info["uuid"] = _s_info["uuid"] + + if _s_info["flavor_id"] != "none" and \ + _s_info["flavor_id"] != s_info["flavor_id"]: + s_info["flavor_id"] = _s_info["flavor_id"] + + if _s_info["vcpus"] != -1 and \ + _s_info["vcpus"] != s_info["vcpus"]: + s_info["vcpus"] = _s_info["vcpus"] + + if _s_info["mem"] != -1 and \ + _s_info["mem"] != s_info["mem"]: + s_info["mem"] = _s_info["mem"] + + if _s_info["disk"] != -1 and \ + _s_info["disk"] != s_info["disk"]: + s_info["disk"] = _s_info["disk"] + + if _s_info["image_id"] != "none" and \ + _s_info["image_id"] != s_info["image_id"]: + s_info["image_id"] = _s_info["image_id"] + + if _s_info["state"] != "none" and \ + _s_info["state"] != s_info["state"]: + s_info["state"] = _s_info["state"] + + if _s_info["status"] != "none" and \ + _s_info["status"] != s_info["status"]: + s_info["status"] = _s_info["status"] + + if _s_info["numa"] != "none" and \ + _s_info["numa"] != s_info["numa"]: + s_info["numa"] = _s_info["numa"] + + def get_json_info(self): + """Get group info as JSON format.""" + + rule_id = "none" + if self.rule is not None: + rule_id = self.rule.rule_id + + return {'status': self.status, + 'uuid': self.uuid, + 'group_type': self.group_type, + 'level': self.level, + 'factory': self.factory, + 'rule_id': rule_id, + 'metadata': self.metadata, + 'server_list': self.server_list, + 'member_hosts': self.member_hosts} diff --git a/engine/src/valet/engine/resource_manager/resources/group_rule.py b/engine/src/valet/engine/resource_manager/resources/group_rule.py new file mode 100644 index 0000000..d43dc12 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resources/group_rule.py @@ -0,0 +1,52 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +class GroupRule(object): + """Container for valet group rule.""" + + def __init__(self, _id): + self.rule_id = _id + + self.status = "enabled" + + self.app_scope = "lcp" + self.rule_type = "affinity" + self.level = "host" + + self.members = [] # a lit of tenent ids who can use this rule + + self.desc = None + + # self.groups = [] # a list of group ids generated under this rule + + self.updated = False + + def get_json_info(self): + """Get group info as JSON format.""" + + return {'status': self.status, + 'app_scope': self.app_scope, + 'rule_type': self.rule_type, + 'level': self.level, + 'members': self.members, + 'desc': self.desc + # 'groups': self.groups + } diff --git a/engine/src/valet/engine/resource_manager/resources/host.py b/engine/src/valet/engine/resource_manager/resources/host.py new file mode 100644 index 0000000..a4d92ba --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resources/host.py @@ -0,0 +1,428 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.resource_manager.resources.numa import NUMA + + +class Host(object): + """Container for compute host.""" + + def __init__(self, _name): + """Define compute host.""" + + self.name = _name + + self.uuid = None + + self.status = "enabled" + self.state = "up" + + # Enabled group objects (e.g., aggregate) this hosting server is in + self.memberships = {} + + self.vCPUs = 0 + self.original_vCPUs = 0 + self.avail_vCPUs = 0 + + self.mem_cap = 0 # MB + self.original_mem_cap = 0 + self.avail_mem_cap = 0 + + self.local_disk_cap = 0 # GB, ephemeral + self.original_local_disk_cap = 0 + self.avail_local_disk_cap = 0 + + self.vCPUs_used = 0 + self.free_mem_mb = 0 + self.free_disk_gb = 0 + self.disk_available_least = 0 + + # To track available cores and memory per NUMA cell + self.NUMA = NUMA() + + self.host_group = None # host_group object (e.g., rack) + + # Kepp a list of placed servers' information + # Here, server_info including {uuid, orch_id, name, + # stack_id, stack_name, + # flavor_id, image_id, tenent_id, + # vcpus, mem, disk, numa, + # state, status} + self.server_list = [] + + # If this host is not defined yet (unknown host). + self.candidate_host_types = {} + + self.updated = False + + def is_available(self): + """Check if host is available.""" + + if self.status == "enabled" and self.state == "up": + return True + else: + return False + + def has_server(self, _s_info): + """Check if server is located in this host.""" + + for s_info in self.server_list: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + return True + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + return True + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + return True + + return False + + def get_server_info(self, _s_info): + """Get server info.""" + + for s_info in self.server_list: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + return s_info + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + return s_info + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + return s_info + + return None + + def add_server(self, _s_info): + """Add new server to this host.""" + + self.server_list.append(_s_info) + + def remove_server(self, _s_info): + """Remove server from this host.""" + + for s_info in self.server_list: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + self.server_list.remove(s_info) + return True + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + self.server_list.remove(s_info) + return True + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + self.server_list.remove(s_info) + return True + + return False + + def update_server(self, _s_info): + """Update server with info from given info. + + The info comes from platform or request (e.g., Heat stack). + """ + + updated = None + + s_info = self.get_server_info(_s_info) + + if s_info is not None: + if _s_info["stack_id"] != "none" and \ + _s_info["stack_id"] != s_info["stack_id"]: + s_info["stack_id"] = _s_info["stack_id"] + updated = s_info + + if _s_info["uuid"] != "none" and \ + _s_info["uuid"] != s_info["uuid"]: + s_info["uuid"] = _s_info["uuid"] + updated = s_info + + if _s_info["flavor_id"] != "none" and \ + _s_info["flavor_id"] != s_info["flavor_id"]: + s_info["flavor_id"] = _s_info["flavor_id"] + updated = s_info + + if _s_info["vcpus"] != -1 and \ + _s_info["vcpus"] != s_info["vcpus"]: + s_info["vcpus"] = _s_info["vcpus"] + updated = s_info + + if _s_info["mem"] != -1 and \ + _s_info["mem"] != s_info["mem"]: + s_info["mem"] = _s_info["mem"] + updated = s_info + + if _s_info["disk"] != -1 and \ + _s_info["disk"] != s_info["disk"]: + s_info["disk"] = _s_info["disk"] + updated = s_info + + if _s_info["image_id"] != "none" and \ + _s_info["image_id"] != s_info["image_id"]: + s_info["image_id"] = _s_info["image_id"] + updated = s_info + + if _s_info["state"] != "none" and \ + _s_info["state"] != s_info["state"]: + s_info["state"] = _s_info["state"] + updated = s_info + + if _s_info["status"] != "none" and \ + _s_info["status"] != s_info["status"]: + s_info["status"] = _s_info["status"] + updated = s_info + + if _s_info["numa"] != "none" and \ + _s_info["numa"] != s_info["numa"]: + s_info["numa"] = _s_info["numa"] + updated = s_info + + if updated is not None: + cell = self.NUMA.pop_cell_of_server(updated) + + if updated["numa"] == "none": + if cell != "none": + updated["numa"] = cell + + self.NUMA.add_server(updated) + + return updated + + def remove_membership(self, _g): + """Remove a membership. + + To return to the resource pool for other placements. + """ + + if _g.factory in ("valet", "server-group"): + if self.name not in _g.member_hosts.keys(): + del self.memberships[_g.name] + + return True + + return False + + def compute_cpus(self, _overcommit_ratio): + """Compute and init oversubscribed CPUs.""" + + if self.vCPUs == 0: + # New host case + + self.vCPUs = self.original_vCPUs * _overcommit_ratio + self.avail_vCPUs = self.vCPUs + self.NUMA.init_cpus(self.vCPUs) + else: + vcpus = self.original_vCPUs * _overcommit_ratio + + if vcpus != self.vCPUs: + # Change of overcommit_ratio + + self.NUMA.adjust_cpus(self.vCPUs, vcpus) + + used = self.vCPUs - self.avail_vCPUs + + self.vCPUs = vcpus + self.avail_vCPUs = self.vCPUs - used + + def compute_avail_cpus(self): + """Compute available CPUs after placements.""" + + avail_vcpus = self.vCPUs - self.vCPUs_used + + if avail_vcpus != self.avail_vCPUs: + # Incurred due to unknown server placement. + + diff = self.avail_vCPUs - avail_vcpus + self.NUMA.apply_unknown_cpus(diff) + + self.avail_vCPUs = avail_vcpus + + return "avail cpus changed (" + str(diff) + ") in " + self.name + + return "ok" + + def compute_mem(self, _overcommit_ratio): + """Compute and init oversubscribed mem capacity.""" + + if self.mem_cap == 0: + # New host case + + self.mem_cap = self.original_mem_cap * _overcommit_ratio + + self.avail_mem_cap = self.mem_cap + + self.NUMA.init_mem(self.mem_cap) + else: + mem_cap = self.original_mem_cap * _overcommit_ratio + + if mem_cap != self.mem_cap: + # Change of overcommit_ratio + + self.NUMA.adjust_mem(self.mem_cap, mem_cap) + + used = self.mem_cap - self.avail_mem_cap + + self.mem_cap = mem_cap + self.avail_mem_cap = self.mem_cap - used + + def compute_avail_mem(self): + """Compute available mem capacity after placements.""" + + used_mem_mb = self.original_mem_cap - self.free_mem_mb + + avail_mem_cap = self.mem_cap - used_mem_mb + + if avail_mem_cap != self.avail_mem_cap: + # Incurred due to unknown server placement. + + diff = self.avail_mem_cap - avail_mem_cap + self.NUMA.apply_unknown_mem(diff) + + self.avail_mem_cap = avail_mem_cap + + return "avail mem changed(" + str(diff) + ") in " + self.name + + return "ok" + + def compute_disk(self, _overcommit_ratio): + """Compute and init oversubscribed disk capacity.""" + + if self.local_disk_cap == 0: + # New host case + + self.local_disk_cap = self.original_local_disk_cap * _overcommit_ratio + + self.avail_local_disk_cap = self.local_disk_cap + else: + local_disk_cap = self.original_local_disk_cap * _overcommit_ratio + + if local_disk_cap != self.local_disk_cap: + # Change of overcommit_ratio + + used = self.local_disk_cap - self.avail_local_disk_cap + + self.local_disk_cap = local_disk_cap + self.avail_local_disk_cap = self.local_disk_cap - used + + def compute_avail_disk(self): + """Compute available disk capacity after placements.""" + + free_disk_cap = self.free_disk_gb + if self.disk_available_least > 0: + free_disk_cap = min(self.free_disk_gb, self.disk_available_least) + + used_disk_cap = self.original_local_disk_cap - free_disk_cap + + avail_local_disk_cap = self.local_disk_cap - used_disk_cap + + if avail_local_disk_cap != self.avail_local_disk_cap: + diff = self.avail_local_disk_cap - avail_local_disk_cap + + self.avail_local_disk_cap = avail_local_disk_cap + + return "avail disk changed(" + str(diff) + ") in " + self.name + + return "ok" + + def deduct_avail_resources(self, _s_info): + """Deduct available amount of resources of this host.""" + + if _s_info.get("vcpus") != -1: + self.avail_vCPUs -= _s_info.get("vcpus") + self.avail_mem_cap -= _s_info.get("mem") + self.avail_local_disk_cap -= _s_info.get("disk") + + def rollback_avail_resources(self, _s_info): + """Rollback available amount of resources of this host.""" + + if _s_info.get("vcpus") != -1: + self.avail_vCPUs += _s_info.get("vcpus") + self.avail_mem_cap += _s_info.get("mem") + self.avail_local_disk_cap += _s_info.get("disk") + + def get_availability_zone(self): + """Get the availability-zone of this host.""" + + for gk, g in self.memberships.iteritems(): + if g.group_type == "az": + return g + + return None + + def get_aggregates(self): + """Get the list of Host-Aggregates of this host.""" + + aggregates = [] + + for gk, g in self.memberships.iteritems(): + if g.group_type == "aggr": + aggregates.append(g) + + return aggregates + + def get_json_info(self): + """Get compute host info as JSON format""" + + membership_list = [] + for gk in self.memberships.keys(): + membership_list.append(gk) + + return {'status': self.status, 'state': self.state, + 'uuid': self.uuid, + 'membership_list': membership_list, + 'vCPUs': self.vCPUs, + 'original_vCPUs': self.original_vCPUs, + 'avail_vCPUs': self.avail_vCPUs, + 'mem': self.mem_cap, + 'original_mem': self.original_mem_cap, + 'avail_mem': self.avail_mem_cap, + 'local_disk': self.local_disk_cap, + 'original_local_disk': self.original_local_disk_cap, + 'avail_local_disk': self.avail_local_disk_cap, + 'vCPUs_used': self.vCPUs_used, + 'free_mem_mb': self.free_mem_mb, + 'free_disk_gb': self.free_disk_gb, + 'disk_available_least': self.disk_available_least, + 'NUMA': self.NUMA.get_json_info(), + 'parent': self.host_group.name, + 'server_list': self.server_list, + 'candidate_host_types': self.candidate_host_types} diff --git a/engine/src/valet/engine/resource_manager/resources/host_group.py b/engine/src/valet/engine/resource_manager/resources/host_group.py new file mode 100644 index 0000000..ebf9a15 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resources/host_group.py @@ -0,0 +1,108 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.app_manager.group import LEVEL + + +class HostGroup(object): + """Container for host group (rack).""" + + def __init__(self, _id): + self.name = _id + + self.status = "enabled" + self.host_group = None + + # 'rack' or 'cluster' (e.g., power domain, zone) + self.host_type = "rack" + + self.parent_resource = None # e.g., datacenter object + self.child_resources = {} # e.g., hosting server objects + + # Enabled group objects (e.g., aggregate) in this group + self.memberships = {} + + self.vCPUs = 0 + self.avail_vCPUs = 0 + + self.mem_cap = 0 # MB + self.avail_mem_cap = 0 + + self.local_disk_cap = 0 # GB, ephemeral + self.avail_local_disk_cap = 0 + + # A list of placed servers' info + self.server_list = [] + + self.updated = False + + def is_available(self): + if self.status == "enabled": + return True + else: + return False + + def init_resources(self): + self.vCPUs = 0 + self.avail_vCPUs = 0 + self.mem_cap = 0 # MB + self.avail_mem_cap = 0 + self.local_disk_cap = 0 # GB, ephemeral + self.avail_local_disk_cap = 0 + + def init_memberships(self): + for gk in self.memberships.keys(): + g = self.memberships[gk] + + if g.factory == "valet": + if LEVEL.index(g.level) < LEVEL.index(self.host_type): + del self.memberships[gk] + else: + del self.memberships[gk] + + def remove_membership(self, _g): + """Remove a membership. """ + + if _g.factory == "valet": + if self.name not in _g.member_hosts.keys(): + del self.memberships[_g.name] + return True + + return False + + def get_json_info(self): + membership_list = [] + for gk in self.memberships.keys(): + membership_list.append(gk) + + child_list = [] + for ck in self.child_resources.keys(): + child_list.append(ck) + + return {'status': self.status, + 'host_type': self.host_type, + 'membership_list': membership_list, + 'vCPUs': self.vCPUs, + 'avail_vCPUs': self.avail_vCPUs, + 'mem': self.mem_cap, + 'avail_mem': self.avail_mem_cap, + 'local_disk': self.local_disk_cap, + 'avail_local_disk': self.avail_local_disk_cap, + 'parent': self.parent_resource.name, + 'children': child_list, + 'server_list': self.server_list} diff --git a/engine/src/valet/engine/resource_manager/resources/numa.py b/engine/src/valet/engine/resource_manager/resources/numa.py new file mode 100644 index 0000000..c6c9542 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/resources/numa.py @@ -0,0 +1,264 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +class NUMA(object): + """Container for NUMA cells.""" + + def __init__(self, numa=None): + """Init NUMA cells. + + Assume 2 NUMA cells of each compute host + """ + + self.cell_0 = {} + + # Available resources + self.cell_0["cpus"] = 0 + self.cell_0["mem"] = 0 + + # A list of server infos + self.cell_0["server_list"] = [] + + self.cell_1 = {} + + # Available resources + self.cell_1["cpus"] = 0 + self.cell_1["mem"] = 0 + + # A list of server infos + self.cell_1["server_list"] = [] + + if numa is not None: + self.cell_0["cpus"] = numa["cell_0"]["cpus"] + self.cell_0["mem"] = numa["cell_0"]["mem"] + self.cell_0["server_list"] = numa["cell_0"]["server_list"] + + self.cell_1["cpus"] = numa["cell_1"]["cpus"] + self.cell_1["mem"] = numa["cell_1"]["mem"] + self.cell_1["server_list"] = numa["cell_1"]["server_list"] + + def init_cpus(self, _cpus): + """Apply CPU capacity faily across NUMA cells. + + Caused by new compute host. + """ + + div = int(float(_cpus) / 2.0) + + self.cell_0["cpus"] = div + self.cell_1["cpus"] = (_cpus - div) + + def init_mem(self, _mem): + """Apply mem capacity faily across NUMA cells. + + Caused by new compute host. + """ + + div = int(float(_mem) / 2.0) + + self.cell_0["mem"] = div + self.cell_1["mem"] = (_mem - div) + + def adjust_cpus(self, _old_cpus, _new_cpus): + """Adjust CPU capacity across NUMA cells. + + Caused by change in compute host. + """ + + div = int(float(_old_cpus) / 2.0) + + old_cpus_0 = div + old_cpus_1 = (_old_cpus - div) + + used_0 = old_cpus_0 - self.cell_0["cpus"] + used_1 = old_cpus_1 - self.cell_1["cpus"] + + div = int(float(_new_cpus) / 2.0) + + self.cell_0["cpus"] = div - used_0 + self.cell_1["cpus"] = _new_cpus - div - used_1 + + def adjust_mem(self, _old_mem, _new_mem): + """Adjust mem capacity across NUMA cells. + + Caused by change in compute host. + """ + + div = int(float(_old_mem) / 2.0) + + old_mem_0 = div + old_mem_1 = (_old_mem - div) + + used_0 = old_mem_0 - self.cell_0["mem"] + used_1 = old_mem_1 - self.cell_1["mem"] + + div = int(float(_new_mem) / 2.0) + + self.cell_0["mem"] = div - used_0 + self.cell_1["mem"] = _new_mem - div - used_1 + + def has_enough_resources(self, _vcpus, _mem): + """Check if any cell has enough resources.""" + + if _vcpus <= self.cell_0["cpus"] and _mem <= self.cell_0["mem"]: + return True + + if _vcpus <= self.cell_1["cpus"] and _mem <= self.cell_1["mem"]: + return True + + return False + + def pop_cell_of_server(self, _s_info): + """Get which cell server is placed.""" + + cell = None + + for s_info in self.cell_0["server_list"]: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + cell = "cell_0" + self.cell_0["server_list"].remove(s_info) + break + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + cell = "cell_0" + self.cell_0["server_list"].remove(s_info) + break + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + cell = "cell_0" + self.cell_0["server_list"].remove(s_info) + break + + if cell is None: + for s_info in self.cell_1["server_list"]: + if _s_info["uuid"] != "none": + if s_info["uuid"] != "none" and \ + s_info["uuid"] == _s_info["uuid"]: + cell = "cell_1" + self.cell_1["server_list"].remove(s_info) + break + + if _s_info["stack_id"] != "none": + if (s_info["stack_id"] != "none" and \ + s_info["stack_id"] == _s_info["stack_id"]) and \ + s_info["name"] == _s_info["name"]: + cell = "cell_1" + self.cell_1["server_list"].remove(s_info) + break + + if _s_info["stack_name"] != "none": + if (s_info["stack_name"] != "none" and \ + s_info["stack_name"] == _s_info["stack_name"]) and \ + s_info["name"] == _s_info["name"]: + cell = "cell_1" + self.cell_1["server_list"].remove(s_info) + break + + if cell is None: + return "none" + else: + return cell + + def deduct_server_resources(self, _s_info): + """Reduce the available resources in a cell by adding a server.""" + + self.pop_cell_of_server(_s_info) + + if self.cell_0["cpus"] > self.cell_1["cpus"]: + self.cell_0["cpus"] -= _s_info.get("vcpus") + self.cell_0["mem"] -= _s_info.get("mem") + self.cell_0["server_list"].append(_s_info) + return "cell_0" + else: + self.cell_1["cpus"] -= _s_info.get("vcpus") + self.cell_1["mem"] -= _s_info.get("mem") + self.cell_1["server_list"].append(_s_info) + return "cell_1" + + def rollback_server_resources(self, _s_info): + """Rollback the server placement in cell by removing server.""" + + cell = self.pop_cell_of_server(_s_info) + + if cell == "cell_0": + self.cell_0["cpus"] += _s_info.get("vcpus") + self.cell_0["mem"] += _s_info.get("mem") + elif cell == "cell_1": + self.cell_1["cpus"] += _s_info.get("vcpus") + self.cell_1["mem"] += _s_info.get("mem") + + # TODO: need to non-NUMA server? + # else: + # self.apply_cpus_fairly(-1.0*_cpus) + # self.apply_mem_fairly(-1.0*_mem) + + def add_server(self, _s_info): + """Add the server info into the cell.""" + + if _s_info["numa"] == "cell_0": + self.cell_0["server_list"].append(_s_info) + elif _s_info["numa"] == "cell_1": + self.cell_1["server_list"].append(_s_info) + + def apply_unknown_cpus(self, _diff): + """Apply unknown cpus fairly across cells.""" + + if _diff > 0: + # Deduct + + div = int(float(_diff) / 2.0) + self.cell_0["cpus"] -= div + self.cell_1["cpus"] -= (_diff - div) + elif _diff < 0: + # Rollback + _diff *= -1 + + div = int(float(_diff) / 2.0) + self.cell_0["cpus"] += div + self.cell_1["cpus"] += (_diff - div) + + def apply_unknown_mem(self, _diff): + """Apply unknown mem capacity fairly across cells.""" + + if _diff > 0: + # Deduct + + div = int(float(_diff) / 2.0) + self.cell_0["mem"] -= div + self.cell_1["mem"] -= (_diff - div) + elif _diff < 0: + # Rollback + _diff *= -1 + + div = int(float(_diff) / 2.0) + self.cell_0["mem"] += div + self.cell_1["mem"] += (_diff - div) + + def get_json_info(self): + """Get NUMA info as JSON format""" + + return {'cell_0': self.cell_0, + 'cell_1': self.cell_1} diff --git a/engine/src/valet/engine/resource_manager/topology_manager.py b/engine/src/valet/engine/resource_manager/topology_manager.py new file mode 100644 index 0000000..f8422d3 --- /dev/null +++ b/engine/src/valet/engine/resource_manager/topology_manager.py @@ -0,0 +1,237 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +from valet.engine.resource_manager.resources.datacenter import Datacenter +from valet.engine.resource_manager.resources.host_group import HostGroup + + +class TopologyManager(object): + """Manager to maintain the layout of datacenter.""" + + def __init__(self, _source, _logger): + self.source = _source + + self.datacenter = None + self.host_groups = {} + self.hosts = {} + + self.logger = _logger + + def get_topology(self, _resource): + """Set datacenter layout into resource.""" + + self.logger.info("set datacenter layout...") + + # Init first + self.datacenter = Datacenter(_resource.datacenter_id) + self.host_groups.clear() + self.hosts.clear() + + if self.source.get_topology(self.datacenter, self.host_groups, self.hosts, + _resource.hosts) != "ok": + return False + + self._check_updated(_resource) + + return True + + def _check_updated(self, _resource): + """Check if the layout is changed.""" + + if _resource.datacenter is None: + _resource.datacenter = Datacenter(_resource.datacenter_id) + _resource.datacenter.updated = True + + self.logger.info("new datacenter (" + _resource.datacenter_id + ") added") + + for hgk in self.host_groups.keys(): + if hgk not in _resource.host_groups.keys(): + new_host_group = HostGroup(hgk) + new_host_group.host_type = self.host_groups[hgk].host_type + + _resource.host_groups[new_host_group.name] = new_host_group + _resource.mark_host_group_updated(hgk) + + self.logger.info("new host_group (" + hgk + ") added") + + for rhgk in _resource.host_groups.keys(): + if rhgk not in self.host_groups.keys(): + host_group = _resource.host_groups[rhgk] + host_group.status = "disabled" + host_group.mark_host_group_updated(rhgk) + + self.logger.info("host_group (" + rhgk + ") disabled") + + # TODO(Gueyoung): what if host exists in topology, + # but does not in resource (DB or platform)? + + for rhk in _resource.hosts.keys(): + if not _resource.hosts[rhk].is_available(): + continue + + if rhk not in self.hosts.keys(): + _resource.hosts[rhk].status = "disabled" + _resource.mark_host_updated(rhk) + + self.logger.info("host (" + rhk + ") removed from topology") + + if self._is_datacenter_updated(_resource): + _resource.datacenter.updated = True + + for hgk in self.host_groups.keys(): + hg = self.host_groups[hgk] + + if self._is_host_group_updated(hg, _resource): + _resource.mark_host_group_updated(hgk) + + for hk in self.hosts.keys(): + if hk in _resource.hosts.keys(): + if not _resource.hosts[hk].is_available(): + continue + + host = self.hosts[hk] + + if self._is_host_updated(host, _resource): + _resource.mark_host_updated(hk) + + # TODO(Gueyoung): Hierachical failure propagation + + def _is_datacenter_updated(self, _resource): + """Check if datacenter's resources are changed.""" + + updated = False + + _rdatacenter = _resource.datacenter + + for rk in self.datacenter.resources.keys(): + + h = None + if rk in _resource.host_groups.keys(): + h = _resource.host_groups[rk] + elif rk in _resource.hosts.keys(): + h = _resource.hosts[rk] + + if h is not None and h.is_available(): + if rk not in _rdatacenter.resources.keys() or h.updated: + _rdatacenter.resources[rk] = h + updated = True + + self.logger.info("datacenter updated (new resource)") + + for rk in _rdatacenter.resources.keys(): + + h = None + if rk in _resource.host_groups.keys(): + h = _resource.host_groups[rk] + elif rk in _resource.hosts.keys(): + h = _resource.hosts[rk] + + if h is None or \ + not h.is_available() or \ + rk not in self.datacenter.resources.keys(): + del _rdatacenter.resources[rk] + updated = True + + self.logger.info("datacenter updated (resource removed)") + + return updated + + def _is_host_group_updated(self, _hg, _resource): + """Check if host_group's parent or children are changed.""" + + updated = False + + _rhg = _resource.host_groups[_hg.name] + + if _hg.host_type != _rhg.host_type: + _rhg.host_type = _hg.host_type + updated = True + self.logger.info("host_group (" + _rhg.name + ") updated (hosting type)") + + if _rhg.parent_resource is None or \ + _rhg.parent_resource.name != _hg.parent_resource.name: + if _hg.parent_resource.name in _resource.host_groups.keys(): + hg = _resource.host_groups[_hg.parent_resource.name] + if hg.is_available(): + _rhg.parent_resource = hg + updated = True + elif _hg.parent_resource.name == _resource.datacenter.name: + _rhg.parent_resource = _resource.datacenter + updated = True + + if updated: + self.logger.info("host_group (" + _rhg.name + ") updated (parent host_group)") + + for rk in _hg.child_resources.keys(): + + h = None + if rk in _resource.hosts.keys(): + h = _resource.hosts[rk] + elif rk in _resource.host_groups.keys(): + h = _resource.host_groups[rk] + + if h is not None and h.is_available(): + if rk not in _rhg.child_resources.keys() or h.updated: + _rhg.child_resources[rk] = h + updated = True + + self.logger.info("host_group (" + _rhg.name + ") updated (new child host)") + + for rk in _rhg.child_resources.keys(): + + h = None + if rk in _resource.hosts.keys(): + h = _resource.hosts[rk] + elif rk in _resource.host_groups.keys(): + h = _resource.host_groups[rk] + + if h is None or \ + not h.is_available() or \ + rk not in _hg.child_resources.keys(): + del _rhg.child_resources[rk] + updated = True + + self.logger.info("host_group (" + _rhg.name + ") updated (child host removed)") + + return updated + + def _is_host_updated(self, _host, _resource): + """Check if host's parent (e.g., rack) is changed.""" + + updated = False + + _rhost = _resource.hosts[_host.name] + + if _rhost.host_group is None or \ + _rhost.host_group.name != _host.host_group.name: + if _host.host_group.name in _resource.host_groups.keys(): + rhost_group = _resource.host_groups[_host.host_group.name] + if rhost_group.is_available(): + _rhost.host_group = rhost_group + updated = True + elif _host.host_group.name == _resource.datacenter.name: + _rhost.host_group = _resource.datacenter + updated = True + + if updated: + self.logger.info("host (" + _rhost.name + ") updated (host_group)") + + return False diff --git a/engine/src/valet/engine/search/__init__.py b/engine/src/valet/engine/search/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/search/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/search/avail_resources.py b/engine/src/valet/engine/search/avail_resources.py new file mode 100644 index 0000000..c4484b8 --- /dev/null +++ b/engine/src/valet/engine/search/avail_resources.py @@ -0,0 +1,76 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +from valet.engine.app_manager.group import LEVEL + + +class AvailResources(object): + """Container to keep hosting resources and candidate resources + + of each level (host or rack) for search. + """ + + def __init__(self, _level): + self.level = _level + self.avail_hosts = {} + self.candidates = {} + + def set_next_level(self): + """Get the next level to search.""" + + current_level_index = LEVEL.index(self.level) + next_level_index = current_level_index - 1 + + if next_level_index < 0: + self.level = LEVEL[0] + else: + self.level = LEVEL[next_level_index] + + def set_next_avail_hosts(self, _avail_hosts, _resource_of_level): + """Set the next level of available hosting resources.""" + + for hk, h in _avail_hosts.iteritems(): + if self.level == "rack": + if h.rack_name == _resource_of_level: + self.avail_hosts[hk] = h + elif self.level == "host": + if h.host_name == _resource_of_level: + self.avail_hosts[hk] = h + + def set_candidates(self): + if self.level == "rack": + for _, h in self.avail_hosts.iteritems(): + self.candidates[h.rack_name] = h + elif self.level == "host": + self.candidates = self.avail_hosts + + def get_candidate(self, _resource): + candidate = None + + if self.level == "rack": + for _, h in self.avail_hosts.iteritems(): + if h.rack_name == _resource.rack_name: + candidate = h + elif self.level == "host": + if _resource.host_name in self.avail_hosts.keys(): + candidate = self.avail_hosts[_resource.host_name] + + return candidate diff --git a/engine/src/valet/engine/search/constraint_solver.py b/engine/src/valet/engine/search/constraint_solver.py new file mode 100644 index 0000000..2593db8 --- /dev/null +++ b/engine/src/valet/engine/search/constraint_solver.py @@ -0,0 +1,117 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.search.filters.affinity_filter import AffinityFilter +from valet.engine.search.filters.aggregate_instance_filter import AggregateInstanceExtraSpecsFilter +from valet.engine.search.filters.az_filter import AvailabilityZoneFilter +from valet.engine.search.filters.cpu_filter import CPUFilter +from valet.engine.search.filters.disk_filter import DiskFilter +from valet.engine.search.filters.diversity_filter import DiversityFilter +from valet.engine.search.filters.dynamic_aggregate_filter import DynamicAggregateFilter +from valet.engine.search.filters.exclusivity_filter import ExclusivityFilter +from valet.engine.search.filters.mem_filter import MemFilter +from valet.engine.search.filters.no_exclusivity_filter import NoExclusivityFilter +from valet.engine.search.filters.numa_filter import NUMAFilter +from valet.engine.search.filters.quorum_diversity_filter import QuorumDiversityFilter + + +class ConstraintSolver(object): + """Constraint solver to filter out candidate hosts.""" + + def __init__(self, _logger): + """Define fileters and application order.""" + + self.logger = _logger + + self.filter_list = [] + + # TODO(Gueyoung): add soft-affinity and soft-diversity filters + + # TODO(Gueyoung): the order of applying filters? + + # Apply platform filters first + self.filter_list.append(AvailabilityZoneFilter()) + self.filter_list.append(AggregateInstanceExtraSpecsFilter()) + self.filter_list.append(CPUFilter()) + self.filter_list.append(MemFilter()) + self.filter_list.append(DiskFilter()) + self.filter_list.append(NUMAFilter()) + + # Apply Valet filters next + self.filter_list.append(DiversityFilter()) + self.filter_list.append(QuorumDiversityFilter()) + self.filter_list.append(ExclusivityFilter()) + self.filter_list.append(NoExclusivityFilter()) + self.filter_list.append(AffinityFilter()) + + # Apply dynamic aggregate filter to determine the host's aggregate + # in a lazy way. + self.filter_list.append(DynamicAggregateFilter()) + + self.status = "ok" + + def get_candidate_list(self, _n, _avail_resources, _avail_hosts, _avail_groups): + """Filter candidate hosts using a list of filters.""" + + level = _avail_resources.level + + candidate_list = [] + + # This is the resource which name is 'any' + ghost_candidate = None + + for _, r in _avail_resources.candidates.iteritems(): + candidate_list.append(r) + + if r.get_resource_name(level) == "any": + ghost_candidate = r + + if len(candidate_list) == 0: + self.status = "no candidate for node = " + _n.vid + self.logger.warning(self.status) + return [] + + for f in self.filter_list: + f.init_condition() + + if not f.check_pre_condition(level, _n, _avail_hosts, _avail_groups): + if f.status is not None: + self.status = f.status + self.logger.error(self.status) + return [] + else: + self.logger.debug("skip " + f.name + " constraint for node = " + _n.vid) + + continue + + candidate_list = f.filter_candidates(level, _n, candidate_list) + + if ghost_candidate and ghost_candidate not in candidate_list: + candidate_list.append(ghost_candidate) + + if len(candidate_list) == 0: + self.status = "violate " + level + " " + f.name + " constraint for node = " + _n.vid + if f.status is not None: + self.status += " detail: " + f.status + self.logger.debug(self.status) + return [] + elif len(candidate_list) > 0: + str_num = str(len(candidate_list)) + self.logger.debug("pass " + f.name + " constraint for node = " + _n.vid + " with " + str_num) + + return candidate_list diff --git a/engine/src/valet/engine/search/filters/__init__.py b/engine/src/valet/engine/search/filters/__init__.py new file mode 100644 index 0000000..bd50995 --- /dev/null +++ b/engine/src/valet/engine/search/filters/__init__.py @@ -0,0 +1,18 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# diff --git a/engine/src/valet/engine/search/filters/affinity_filter.py b/engine/src/valet/engine/search/filters/affinity_filter.py new file mode 100644 index 0000000..fb9aadc --- /dev/null +++ b/engine/src/valet/engine/search/filters/affinity_filter.py @@ -0,0 +1,69 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.app_manager.group import Group + + +class AffinityFilter(object): + + def __init__(self): + self.name = "affinity" + + self.affinity_id = None + self.is_first = True + + self.status = None + + def init_condition(self): + self.affinity_id = None + self.is_first = True + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + if isinstance(_v, Group): + self.affinity_id = _v.vid + + if self.affinity_id in _avail_groups.keys(): + self.is_first = False + + if self.affinity_id is not None: + return True + else: + return False + + def filter_candidates(self, _level, _v, _candidate_list): + if self.is_first: + return _candidate_list + + candidate_list = [] + + for c in _candidate_list: + if self._check_candidate(_level, c): + candidate_list.append(c) + + return candidate_list + + def _check_candidate(self, _level, _candidate): + """Filter based on named affinity group.""" + + memberships = _candidate.get_all_memberships(_level) + for gk, gr in memberships.iteritems(): + if gr.group_type == "affinity" and gk == self.affinity_id: + return True + + return False diff --git a/engine/src/valet/engine/search/filters/aggregate_instance_filter.py b/engine/src/valet/engine/search/filters/aggregate_instance_filter.py new file mode 100644 index 0000000..316388e --- /dev/null +++ b/engine/src/valet/engine/search/filters/aggregate_instance_filter.py @@ -0,0 +1,106 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import filter_utils +import six + + +_SCOPE = 'aggregate_instance_extra_specs' + + +class AggregateInstanceExtraSpecsFilter(object): + """AggregateInstanceExtraSpecsFilter works with InstanceType records.""" + + def __init__(self): + self.name = "aggregate-instance-extra-specs" + self.avail_hosts = {} + self.status = None + + def init_condition(self): + self.avail_hosts = {} + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + if len(_v.extra_specs_list) > 0: + self.avail_hosts = _avail_hosts + return True + else: + return False + + def filter_candidates(self, _level, _v, _candidate_list): + candidate_list = [] + + for c in _candidate_list: + if self._check_candidate(_level, _v, c): + candidate_list.append(c) + + return candidate_list + + def _check_candidate(self, _level, _v, _candidate): + """Check given candidate host if instance's extra specs matches to metadata.""" + + # If the candidate's host_type is not determined, skip the filter. + if _level == "host": + if len(_candidate.candidate_host_types) > 0: + return True + else: + # In rack level, if any host's host_type in the rack is not determined, + # skip the filter + for _, rh in self.avail_hosts.iteritems(): + if rh.rack_name == _candidate.rack_name: + if len(rh.candidate_host_types) > 0: + return True + + metadatas = filter_utils.aggregate_metadata_get_by_host(_level, _candidate) + + for extra_specs in _v.extra_specs_list: + for gk, metadata in metadatas.iteritems(): + if self._match_metadata(gk, extra_specs, metadata): + break + else: + return False + + return True + + def _match_metadata(self, _g_name, _extra_specs, _metadata): + """Match conditions + - No extra_specs + - Different SCOPE of extra_specs keys + - key of extra_specs exists in metadata & any value matches + """ + + for key, req in six.iteritems(_extra_specs): + scope = key.split(':', 1) + if len(scope) > 1: + if scope[0] != _SCOPE: + continue + else: + del scope[0] + key = scope[0] + + aggregate_vals = _metadata.get(key, None) + if not aggregate_vals: + return False + + for aggregate_val in aggregate_vals: + if filter_utils.match(aggregate_val, req): + break + else: + return False + + return True diff --git a/engine/src/valet/engine/search/filters/az_filter.py b/engine/src/valet/engine/search/filters/az_filter.py new file mode 100644 index 0000000..8902893 --- /dev/null +++ b/engine/src/valet/engine/search/filters/az_filter.py @@ -0,0 +1,74 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import filter_utils + +from valet.engine.app_manager.group import Group +from valet.engine.app_manager.server import Server + + +class AvailabilityZoneFilter(object): + """Filters Hosts by availability zone. + + Works with aggregate metadata availability zones, using the key 'availability_zone'. + + Note: in theory a compute node can be part of multiple availability_zones + """ + + def __init__(self): + self.name = "availability-zone" + + self.status = None + + def init_condition(self): + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + if (isinstance(_v, Server) and _v.availability_zone is not None) or \ + (isinstance(_v, Group) and len(_v.availability_zone_list) > 0): + return True + else: + return False + + def filter_candidates(self, _level, _v, _candidate_list): + candidate_list = [] + + for c in _candidate_list: + if self._check_candidate(_level, _v, c): + candidate_list.append(c) + + return candidate_list + + def _check_candidate(self, _level, _v, _candidate): + az_request_list = [] + if isinstance(_v, Server): + az_request_list.append(_v.availability_zone) + else: + for az in _v.availability_zone_list: + az_request_list.append(az) + + if len(az_request_list) == 0: + return True + + availability_zone_list = filter_utils.availability_zone_get_by_host(_level, _candidate) + + for azr in az_request_list: + if azr not in availability_zone_list: + return False + + return True diff --git a/engine/src/valet/engine/search/filters/cpu_filter.py b/engine/src/valet/engine/search/filters/cpu_filter.py new file mode 100644 index 0000000..95dba8c --- /dev/null +++ b/engine/src/valet/engine/search/filters/cpu_filter.py @@ -0,0 +1,57 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +class CPUFilter(object): + + def __init__(self): + self.name = "cpu" + + self.status = None + + def init_condition(self): + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + return True + + def filter_candidates(self, _level, _v, _candidate_list): + candidate_list = [] + + for c in _candidate_list: + if self._check_candidate(_level, _v, c): + candidate_list.append(c) + + return candidate_list + + def _check_candidate(self, _level, _v, _candidate): + """Return True if host has sufficient CPU cores.""" + + avail_vcpus = _candidate.get_vcpus(_level) + + instance_vcpus = _v.vCPUs + + # TODO: need to check against original CPUs? + # Do not allow an instance to overcommit against itself, + # only against other instances. + # if instance_vcpus > vCPUs: + # return False + + if avail_vcpus < instance_vcpus: + return False + + return True diff --git a/engine/src/valet/engine/search/filters/disk_filter.py b/engine/src/valet/engine/search/filters/disk_filter.py new file mode 100644 index 0000000..00fa93e --- /dev/null +++ b/engine/src/valet/engine/search/filters/disk_filter.py @@ -0,0 +1,50 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +class DiskFilter(object): + + def __init__(self): + self.name = "disk" + + self.status = None + + def init_condition(self): + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + return True + + def filter_candidates(self, _level, _v, _candidate_list): + candidate_list = [] + + for c in _candidate_list: + if self._check_candidate(_level, _v, c): + candidate_list.append(c) + + return candidate_list + + def _check_candidate(self, _level, _v, _candidate): + """Filter based on disk usage.""" + + requested_disk = _v.local_volume_size + usable_disk = _candidate.get_local_disk(_level) + + if not usable_disk >= requested_disk: + return False + + return True diff --git a/engine/src/valet/engine/search/filters/diversity_filter.py b/engine/src/valet/engine/search/filters/diversity_filter.py new file mode 100644 index 0000000..882e11a --- /dev/null +++ b/engine/src/valet/engine/search/filters/diversity_filter.py @@ -0,0 +1,62 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +class DiversityFilter(object): + + def __init__(self): + self.name = "diversity" + + self.diversity_list = [] + + self.status = None + + def init_condition(self): + self.diversity_list = [] + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + if len(_v.diversity_groups) > 0: + for _, div_group in _v.diversity_groups.iteritems(): + if div_group.level == _level: + self.diversity_list.append(div_group.vid) + + if len(self.diversity_list) > 0: + return True + else: + return False + + def filter_candidates(self, _level, _v, _candidate_list): + candidate_list = [] + + for c in _candidate_list: + if self._check_candidate(_level, c): + candidate_list.append(c) + + return candidate_list + + def _check_candidate(self, _level, _candidate): + """Filter based on named diversity groups.""" + + memberships = _candidate.get_memberships(_level) + + for diversity_id in self.diversity_list: + for gk, gr in memberships.iteritems(): + if gr.group_type == "diversity" and gk == diversity_id: + return False + + return True diff --git a/engine/src/valet/engine/search/filters/dynamic_aggregate_filter.py b/engine/src/valet/engine/search/filters/dynamic_aggregate_filter.py new file mode 100644 index 0000000..709a9b9 --- /dev/null +++ b/engine/src/valet/engine/search/filters/dynamic_aggregate_filter.py @@ -0,0 +1,141 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import copy + +from valet.engine.app_manager.server import Server +from valet.engine.search.filters.aggregate_instance_filter import AggregateInstanceExtraSpecsFilter +from valet.engine.search.filters.cpu_filter import CPUFilter +from valet.engine.search.filters.disk_filter import DiskFilter +from valet.engine.search.filters.mem_filter import MemFilter +from valet.engine.search.filters.numa_filter import NUMAFilter + + +class DynamicAggregateFilter(object): + + def __init__(self): + self.name = "dynamic-aggregate" + + self.avail_hosts = {} + self.avail_groups = {} + + self.aggr_filter = AggregateInstanceExtraSpecsFilter() + self.cpu_filter = CPUFilter() + self.mem_filter = MemFilter() + self.disk_filter = DiskFilter() + self.numa_filter = NUMAFilter() + + self.status = None + + def init_condition(self): + self.avail_hosts = {} + self.avail_groups = {} + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + if _level == "host" and isinstance(_v, Server): + self.avail_hosts = _avail_hosts + self.avail_groups = _avail_groups + return True + else: + return False + + def filter_candidates(self, _level, _v, _candidate_list): + specified_candidate_list = [] # candidates having specific host type + unspecified_candidate_list = [] # candidates not having specific host type + + for c in _candidate_list: + if len(c.candidate_host_types) == 0: + specified_candidate_list.append(c) + else: + unspecified_candidate_list.append(c) + + # Try to use existing hosts that have specific host type + if len(specified_candidate_list) > 0: + return specified_candidate_list + + # Take just one candidate + candidate = unspecified_candidate_list[0] + + # Get the host-aggregate of _v + flavor_type_list = _v.get_flavor_types() + if len(flavor_type_list) > 1: + self.status = "have more than one flavor type" + return [] + + ha = self.avail_groups[flavor_type_list[0]] + + # Add the host-aggregate into host and rack memberships. + # Adjust host with avail cpus, mem, disk, and numa + candidate.adjust_avail_resources(ha) + + # Change all others in the same rack. + for hrk, hr in self.avail_hosts.iteritems(): + if hrk != candidate.host_name: + if hr.rack_name == candidate.rack_name: + hr.adjust_avail_rack_resources(ha, + candidate.rack_avail_vCPUs, + candidate.rack_avail_mem, + candidate.rack_avail_local_disk) + + # Once the host type (ha) is determined, remove candidate_host_types + candidate.old_candidate_host_types = copy.deepcopy(candidate.candidate_host_types) + candidate.candidate_host_types.clear() + + # Filter against host-aggregate, cpu, mem, disk, numa + + self.aggr_filter.init_condition() + if self.aggr_filter.check_pre_condition(_level, _v, self.avail_hosts, self.avail_groups): + if not self.aggr_filter._check_candidate(_level, _v, candidate): + self.status = "host-aggregate violation" + + self.cpu_filter.init_condition() + if not self.cpu_filter._check_candidate(_level, _v, candidate): + self.status = "cpu violation" + + self.mem_filter.init_condition() + if not self.mem_filter._check_candidate(_level, _v, candidate): + self.status = "mem violation" + + self.disk_filter.init_condition() + if not self.disk_filter._check_candidate(_level, _v, candidate): + self.status = "disk violation" + + self.numa_filter.init_condition() + if self.numa_filter.check_pre_condition(_level, _v, self.avail_hosts, self.avail_groups): + if not self.numa_filter._check_candidate(_level, _v, candidate): + self.status = "numa violation" + + if self.status is None: + # Candidate not filtered. + return [candidate] + else: + # Rollback + candidate.rollback_avail_resources(ha) + candidate.candidate_host_types = copy.deepcopy(candidate.old_candidate_host_types) + candidate.old_candidate_host_types.clear() + + for hrk, hr in self.avail_hosts.iteritems(): + if hrk != candidate.host_name: + if hr.rack_name == candidate.rack_name: + hr.rollback_avail_rack_resources(ha, + candidate.rack_avail_vCPUs, + candidate.rack_avail_mem, + candidate.rack_avail_local_disk) + + return [] diff --git a/engine/src/valet/engine/search/filters/exclusivity_filter.py b/engine/src/valet/engine/search/filters/exclusivity_filter.py new file mode 100644 index 0000000..77efd97 --- /dev/null +++ b/engine/src/valet/engine/search/filters/exclusivity_filter.py @@ -0,0 +1,81 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +class ExclusivityFilter(object): + + def __init__(self): + self.name = "exclusivity" + + self.exclusivity_id = None + + self.status = None + + def init_condition(self): + self.exclusivity_id = None + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + exclusivities = _v.get_exclusivities(_level) + + if len(exclusivities) > 1: + self.status = "multiple exclusivities for node = " + _v.vid + return False + + if len(exclusivities) == 1: + ex_group = exclusivities[exclusivities.keys()[0]] + + if ex_group.level == _level: + self.exclusivity_id = ex_group.vid + + if self.exclusivity_id is not None: + return True + else: + return False + + def filter_candidates(self, _level, _v, _candidate_list): + + candidate_list = self._get_candidates(_level, _candidate_list) + + return candidate_list + + def _get_candidates(self, _level, _candidate_list): + candidate_list = [] + + for c in _candidate_list: + if self._check_exclusive_candidate(_level, c) is True or \ + self._check_empty(_level, c) is True: + candidate_list.append(c) + + return candidate_list + + def _check_exclusive_candidate(self, _level, _candidate): + memberships = _candidate.get_memberships(_level) + + for gk, gr in memberships.iteritems(): + if gr.group_type == "exclusivity" and gk == self.exclusivity_id: + return True + + return False + + def _check_empty(self, _level, _candidate): + num_of_placed_servers = _candidate.get_num_of_placed_servers(_level) + + if num_of_placed_servers == 0: + return True + + return False diff --git a/engine/src/valet/engine/search/filters/filter_utils.py b/engine/src/valet/engine/search/filters/filter_utils.py new file mode 100644 index 0000000..1005263 --- /dev/null +++ b/engine/src/valet/engine/search/filters/filter_utils.py @@ -0,0 +1,117 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import collections +import operator + + +# 1. The following operations are supported: +# =, s==, s!=, s>=, s>, s<=, s<, <in>, <all-in>, <or>, ==, !=, >=, <= +# 2. Note that <or> is handled in a different way below. +# 3. If the first word in the extra_specs is not one of the operators, +# it is ignored. +op_methods = {'=': lambda x, y: float(x) >= float(y), + '<in>': lambda x, y: y in x, + '<all-in>': lambda x, y: all(val in x for val in y), + '==': lambda x, y: float(x) == float(y), + '!=': lambda x, y: float(x) != float(y), + '>=': lambda x, y: float(x) >= float(y), + '<=': lambda x, y: float(x) <= float(y), + 's==': operator.eq, + 's!=': operator.ne, + 's<': operator.lt, + 's<=': operator.le, + 's>': operator.gt, + 's>=': operator.ge} + + +def match(value, req): + words = req.split() + + op = method = None + if words: + op = words.pop(0) + method = op_methods.get(op) + + if op != '<or>' and not method: + return value == req + + if value is None: + return False + + if op == '<or>': # Ex: <or> v1 <or> v2 <or> v3 + while True: + if words.pop(0) == value: + return True + if not words: + break + words.pop(0) # remove a keyword <or> + if not words: + break + return False + + if words: + if op == '<all-in>': # requires a list not a string + return method(value, words) + return method(value, words[0]) + return False + + +def aggregate_metadata_get_by_host(_level, _host, _key=None): + """Returns a dict of all metadata based on a metadata key for a specific host. + + If the key is not provided, returns a dict of all metadata. + """ + + metadatas = {} + + groups = _host.get_memberships(_level) + + for gk, g in groups.iteritems(): + if g.group_type == "aggr": + if _key is None or _key in g.metadata: + metadata = collections.defaultdict(set) + for k, v in g.metadata.items(): + if k != "prior_metadata": + metadata[k].update(x.strip() for x in v.split(',')) + else: + # metadata[k] = v + if isinstance(g.metadata["prior_metadata"], dict): + for ik, iv in g.metadata["prior_metadata"].items(): + metadata[ik].update(y.strip() for y in iv.split(',')) + metadatas[gk] = metadata + + return metadatas + + +def availability_zone_get_by_host(_level, _host): + availability_zone_list = [] + + groups = _host.get_memberships(_level) + + for gk, g in groups.iteritems(): + if g.group_type == "az": + g_name_elements = gk.split(':', 1) + if len(g_name_elements) > 1: + g_name = g_name_elements[1] + else: + g_name = gk + + availability_zone_list.append(g_name) + + return availability_zone_list diff --git a/engine/src/valet/engine/search/filters/mem_filter.py b/engine/src/valet/engine/search/filters/mem_filter.py new file mode 100644 index 0000000..1b494c2 --- /dev/null +++ b/engine/src/valet/engine/search/filters/mem_filter.py @@ -0,0 +1,56 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +class MemFilter(object): + + def __init__(self): + self.name = "mem" + + self.status = None + + def init_condition(self): + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + return True + + def filter_candidates(self, _level, _v, _candidate_list): + candidate_list = [] + + for c in _candidate_list: + if self._check_candidate(_level, _v, c): + candidate_list.append(c) + + return candidate_list + + def _check_candidate(self, _level, _v, _candidate): + """Only return hosts with sufficient available RAM.""" + + requested_ram = _v.mem # MB + usable_ram = _candidate.get_mem(_level) + + # TODO: need to check against original mem_cap? + # Do not allow an instance to overcommit against itself, + # only against other instances. + # if not total_ram >= requested_ram: + # return False + + if not usable_ram >= requested_ram: + return False + + return True diff --git a/engine/src/valet/engine/search/filters/no_exclusivity_filter.py b/engine/src/valet/engine/search/filters/no_exclusivity_filter.py new file mode 100644 index 0000000..43516fe --- /dev/null +++ b/engine/src/valet/engine/search/filters/no_exclusivity_filter.py @@ -0,0 +1,53 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +class NoExclusivityFilter(object): + + def __init__(self): + self.name = "no-exclusivity" + + self.status = None + + def init_condition(self): + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + exclusivities = _v.get_exclusivities(_level) + + if len(exclusivities) == 0: + return True + else: + return False + + def filter_candidates(self, _level, _v, _candidate_list): + candidate_list = [] + + for c in _candidate_list: + if self._check_candidate(_level, c): + candidate_list.append(c) + + return candidate_list + + def _check_candidate(self, _level, _candidate): + memberships = _candidate.get_memberships(_level) + + for _, g in memberships.iteritems(): + if g.group_type == "exclusivity" and g.level == _level: + return False + + return True diff --git a/engine/src/valet/engine/search/filters/numa_filter.py b/engine/src/valet/engine/search/filters/numa_filter.py new file mode 100644 index 0000000..3e095ec --- /dev/null +++ b/engine/src/valet/engine/search/filters/numa_filter.py @@ -0,0 +1,84 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.app_manager.server import Server + + +_SCOPE = 'hw' + + +class NUMAFilter(object): + """Check NUMA alignment request in Flavor.""" + + def __init__(self): + """Define filter name and status.""" + + self.name = "numa" + + self.status = None + + def init_condition(self): + """Init variable.""" + + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + """Check if given server needs to check this filter.""" + + if _level == "host" and isinstance(_v, Server): + if _v.need_numa_alignment(): + return True + + return False + + def filter_candidates(self, _level, _v, _candidate_list): + """Check and filter one candidate at a time.""" + + candidate_list = [] + + for c in _candidate_list: + if self._check_candidate(_level, _v, c): + candidate_list.append(c) + + return candidate_list + + def _check_candidate(self, _level, _v, _candidate): + """Check given candidate host if it meets numa requirement.""" + + # servers = [] + # if isinstance(_v, Group): + # _v.get_servers(servers) + # else: + # servers.append(_v) + + # (vcpus_demand, mem_demand) = self._get_demand_with_numa(servers) + + return _candidate.NUMA.has_enough_resources(_v.vCPUs, _v.mem) + + def _get_demand_with_numa(self, _servers): + """Check numa and compute the amount of vCPUs and memory.""" + + vcpus = 0 + mem = 0 + + for s in _servers: + if s.need_numa_alignment(): + vcpus += s.vCPUs + mem += s.mem + + return vcpus, mem diff --git a/engine/src/valet/engine/search/filters/quorum_diversity_filter.py b/engine/src/valet/engine/search/filters/quorum_diversity_filter.py new file mode 100644 index 0000000..6388ebc --- /dev/null +++ b/engine/src/valet/engine/search/filters/quorum_diversity_filter.py @@ -0,0 +1,106 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import math + + +class QuorumDiversityFilter(object): + + def __init__(self): + self.name = "quorum-diversity" + + self.quorum_diversity_group_list = [] + + self.status = None + + def init_condition(self): + self.quorum_diversity_group_list = [] + self.status = None + + def check_pre_condition(self, _level, _v, _avail_hosts, _avail_groups): + if len(_v.quorum_diversity_groups) > 0: + for _, qdiv_group in _v.quorum_diversity_groups.iteritems(): + if qdiv_group.level == _level: + self.quorum_diversity_group_list.append(qdiv_group) + + if len(self.quorum_diversity_group_list) > 0: + return True + else: + return False + + def filter_candidates(self, _level, _v, _candidate_list): + candidate_list = [] + + # First, try diversity rule. + + for c in _candidate_list: + if self._check_diversity_candidate(_level, c): + candidate_list.append(c) + + if len(candidate_list) > 0: + return candidate_list + + # Second, if no available hosts for diversity rule, try quorum rule. + + for c in _candidate_list: + if self._check_quorum_candidate(_level, c): + candidate_list.append(c) + + return candidate_list + + def _check_diversity_candidate(self, _level, _candidate): + """Filter based on named diversity groups.""" + + memberships = _candidate.get_memberships(_level) + + for qdiv in self.quorum_diversity_group_list: + for gk, gr in memberships.iteritems(): + if gr.group_type == "quorum-diversity" and gk == qdiv.vid: + return False + + return True + + def _check_quorum_candidate(self, _level, _candidate): + """Filter based on quorum-diversity rule.""" + + memberships = _candidate.get_memberships(_level) + hk = _candidate.get_resource_name(_level) + + for qdiv in self.quorum_diversity_group_list: + # Requested num of servers under this rule + total_num_of_servers = len(qdiv.server_list) + + num_of_placed_servers_in_candidate = -1 + + for gk, gr in memberships.iteritems(): + if gr.group_type == "quorum-diversity" and gk == qdiv.vid: + # Total num of servers under this rule + total_num_of_servers += gr.original_num_of_placed_servers + + if hk in gr.num_of_placed_servers_of_host.keys(): + num_of_placed_servers_in_candidate = gr.num_of_placed_servers_of_host[hk] + + break + + # Allowed maximum num of servers per host + quorum = max(math.ceil(float(total_num_of_servers) / 2.0 - 1.0), 1.0) + + if num_of_placed_servers_in_candidate >= quorum: + return False + + return True diff --git a/engine/src/valet/engine/search/optimizer.py b/engine/src/valet/engine/search/optimizer.py new file mode 100644 index 0000000..de45cee --- /dev/null +++ b/engine/src/valet/engine/search/optimizer.py @@ -0,0 +1,494 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.app_manager.group import Group +from valet.engine.app_manager.server import Server +from valet.engine.search.search import Search + + +class Optimizer(object): + """Optimizer to compute the optimal placements.""" + + def __init__(self, _logger): + self.logger = _logger + + self.search = Search(self.logger) + + def place(self, _app): + """Scheduling placements given app.""" + + _app.set_weight() + _app.set_optimization_priority() + + if self.search.place(_app) is True: + if _app.status == "ok": + self._set_app(_app, "create") + + self._set_resource(_app) + if _app.status != "ok": + return + else: + if _app.status == "ok": + _app.status = "failed" + + self._rollback_placements(_app) + + def update(self, _app): + """Update state of current placements.""" + + if _app.state == "delete": + self._update_app_for_delete(_app) + + self._update_resource(_app) + if _app.status != "ok": + return + else: + _app.status = "unknown state while updating app" + + def confirm(self, _app): + """Confirm prior request.""" + + if _app.state == "created": + self._update_app(_app) + if _app.status != "ok": + return + + self._update_resource(_app) + if _app.status != "ok": + return + elif _app.state == "deleted": + self._remove_resource(_app) + if _app.status != "ok": + return + else: + _app.status = "unknown state while updating app" + return + + def rollback(self, _app): + """Rollback prior decision.""" + + if _app.state == "created": + self._update_app(_app) + if _app.status != "ok": + return + + self._update_resource(_app) + if _app.status != "ok": + return + elif _app.state == "deleted": + self._remove_resource(_app) + if _app.status != "ok": + return + else: + _app.status = "unknown state while updating app" + + def _set_app(self, _app, _state): + """Update with assigned hosts.""" + + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + v.host = p.host_name + if p.rack_name != "any": + v.host_group = p.rack_name + + host = self.search.avail_hosts[p.host_name] + + s_info = {} + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + s_info["uuid"] = "none" + s_info["name"] = v.name + + v.numa = host.NUMA.pop_cell_of_server(s_info) + + v.state = _state + + # Put back servers from groups. + _app.reset_servers() + + def _update_app(self, _app): + """Update state of servers.""" + + for sk, s in _app.servers.iteritems(): + if s["host"] == "none": + continue + + s["state"] = _app.state + + host_name = s.get("host") + + host = None + if host_name in _app.resource.hosts.keys(): + host = _app.resource.hosts[host_name] + + s_info = {} + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + s_info["uuid"] = "none" + s_info["name"] = s.get("name") + + # Check if the prior placements changed. + if host is None or \ + not host.is_available() or \ + not host.has_server(s_info): + _app.status = "server (" + sk + ") placement has been changed" + self.logger.error(_app.status) + + def _update_app_for_delete(self, _app): + """Check the prior placements and update state + + And update placements if they have been changed. + """ + + for sk, s in _app.servers.iteritems(): + if s["host"] == "none": + continue + + s["state"] = _app.state + + host_name = s.get("host") + + host = None + if host_name in _app.resource.hosts.keys(): + host = _app.resource.hosts[host_name] + + s_info = {} + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + s_info["uuid"] = "none" + s_info["name"] = s.get("name") + + # Check if the prior placements changed. + if host is None or \ + not host.is_available() or \ + not host.has_server(s_info): + self.logger.warning("server (" + sk + ") placement has been changed") + + new_host = _app.resource.get_host_of_server(s_info) + + if new_host is not None: + s["host"] = new_host.name + else: + s["host"] = "none" + self.logger.warning("server (" + sk + ") not exists") + + def _set_resource(self, _app): + """Update resource status based on new placements.""" + + # If host's type (i.e., host-aggregate) is not determined before, + # Convert/set host's type to one as specified in VM. + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + # The host object p was deep copied, so use original object. + rh = self.search.avail_hosts[p.host_name] + + if rh.old_candidate_host_types is not None and len(rh.old_candidate_host_types) > 0: + flavor_type_list = v.get_flavor_types() + ha = self.search.avail_groups[flavor_type_list[0]] + + self._convert_host(rh, + ha.name, + rh.get_host_type(ha, rh.old_candidate_host_types), + _app.resource) + + placements = {} + + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + s_info = {} + + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + + s_info["uuid"] = "none" + s_info["orch_id"] = v.orch_id + s_info["name"] = v.name + + s_info["flavor_id"] = v.flavor + s_info["vcpus"] = v.vCPUs + s_info["mem"] = v.mem + s_info["disk"] = v.local_volume_size + s_info["numa"] = v.numa + + s_info["image_id"] = v.image + s_info["tenant_id"] = _app.tenant_id + + s_info["state"] = v.state + s_info["status"] = "valid" + + placements[v.vid] = {} + placements[v.vid]["new_host"] = p.host_name + placements[v.vid]["info"] = s_info + + # Update compute host with new servers + if not _app.resource.update_server_placements(change_of_placements=placements): + _app.status = "fail while updating server placements" + return + + groups = {} + + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + rh = self.search.avail_hosts[p.host_name] + + for gk, g in rh.host_memberships.iteritems(): + if g.factory in ("valet", "server-group"): + if g.level == "host": + _app.resource.add_group(gk, + g.group_type, + g.level, + g.factory, + rh.host_name) + + if rh.rack_name != "any": + for gk, g in rh.rack_memberships.iteritems(): + if g.factory in ("valet", "server-group"): + if g.level == "rack": + _app.resource.add_group(gk, + g.group_type, + g.level, + g.factory, + rh.rack_name) + + s_info = placements[v.vid].get("info") + + self._collect_groups_of_server(v, s_info, groups) + + # Update groups with new servers + _app.resource.update_server_grouping(change_of_placements=placements, + new_groups=groups) + + _app.resource.update_resource() + + def _convert_host(self, _rhost, _ha_name, _host_type, _resource): + """Convert host's type into the specific type as given.""" + + host = _resource.hosts[_rhost.host_name] + + if host.candidate_host_types is None or len(host.candidate_host_types) == 0: + return + + host.vCPUs = _host_type["vCPUs"] + host.original_vCPUs = _host_type["original_vCPUs"] + host.avail_vCPUs = _host_type["avail_vCPUs"] + host.mem_cap = _host_type["mem"] + host.original_mem_cap = _host_type["original_mem"] + host.avail_mem_cap = _host_type["avail_mem"] + host.local_disk_cap = _host_type["local_disk"] + host.original_local_disk_cap = _host_type["original_local_disk"] + host.avail_local_disk_cap = _host_type["avail_local_disk"] + host.vCPUs_used = _host_type["vCPUs_used"] + host.free_mem_mb = _host_type["free_mem_mb"] + host.free_disk_gb = _host_type["free_disk_gb"] + host.disk_available_least = _host_type["disk_available_least"] + + host.NUMA = _rhost.NUMA + + ha = _resource.groups[_ha_name] + host.memberships[ha.name] = ha + ha.member_hosts[host.name] = [] + ha.updated = True + + _resource.mark_host_updated(host.name) + + _resource.update_resource() + + if host.candidate_host_types is not None: + host.candidate_host_types.clear() + + def _rollback_placements(self, _app): + """Remove placements when they fail. + + Remove placements from NUMA cells of resource object. + """ + + for v, p in self.search.node_placements.iteritems(): + if isinstance(v, Server): + s_info = {} + + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + + s_info["uuid"] = "none" + s_info["orch_id"] = v.orch_id + s_info["name"] = v.name + + s_info["flavor_id"] = v.flavor + s_info["vcpus"] = v.vCPUs + s_info["mem"] = v.mem + s_info["disk"] = v.local_volume_size + s_info["numa"] = v.numa + + s_info["image_id"] = v.image + s_info["tenant_id"] = _app.tenant_id + + s_info["state"] = v.state + s_info["status"] = "valid" + + host = _app.resource.hosts[p.host_name] + host.NUMA.rollback_server_resources(s_info) + + def _collect_groups_of_server(self, _v, _s_info, _groups): + """Collect all groups of the server and its parent (affinity).""" + + # TODO(Gueyoung): track host-aggregates and availability-zone? + + for gk in _v.exclusivity_groups.keys(): + if gk not in _groups.keys(): + _groups[gk] = [] + _groups[gk].append(_s_info) + + for gk in _v.diversity_groups.keys(): + if gk not in _groups.keys(): + _groups[gk] = [] + _groups[gk].append(_s_info) + + for gk in _v.quorum_diversity_groups.keys(): + if gk not in _groups.keys(): + _groups[gk] = [] + _groups[gk].append(_s_info) + + if isinstance(_v, Group): + if _v.vid not in _groups.keys(): + _groups[_v.vid] = [] + _groups[_v.vid].append(_s_info) + + # Recursively check server or its affinity group. + if _v.surgroup is not None: + self._collect_groups_of_server(_v.surgroup, _s_info, _groups) + + def _remove_resource(self, _app): + """Remove servers from resources. + + Resources: NUMA, host, host_group, datacenter, + valet groups and server-groups. + """ + + placements = {} + + for sk, s in _app.servers.iteritems(): + if s["host"] == "none": + continue + + s_info = {} + + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + + s_info["uuid"] = "none" + s_info["name"] = s.get("name") + + s_info["flavor_id"] = s.get("flavor") + s_info["vcpus"] = s.get("cpus") + s_info["mem"] = s.get("mem") + s_info["disk"] = s.get("local_volume") + s_info["numa"] = s.get("numa") + + s_info["image_id"] = s.get("image") + s_info["tenant_id"] = _app.tenant_id + + s_info["state"] = "deleted" + + if s_info["stack_id"] != "none": + sid = s_info["stack_id"] + ":" + s_info["name"] + else: + sid = s_info["stack_name"] + ":" + s_info["name"] + + placements[sid] = {} + placements[sid]["old_host"] = s.get("host") + placements[sid]["info"] = s_info + + if not _app.resource.update_server_placements(change_of_placements=placements): + _app.status = "fail while updating server placements" + return + + _app.resource.update_server_grouping(change_of_placements=placements, + new_groups={}) + + _app.resource.update_resource() + + def _update_resource(self, _app): + """Update state of servers in resources. + + Resources: NUMA, host, host_group, datacenter, + valet groups and server-groups. + """ + + placements = {} + + for sk, s in _app.servers.iteritems(): + if s["host"] == "none": + continue + + s_info = {} + + if _app.app_id is None or _app.app_id == "none": + s_info["stack_id"] = "none" + else: + s_info["stack_id"] = _app.app_id + s_info["stack_name"] = _app.app_name + + s_info["uuid"] = "none" + s_info["name"] = s.get("name") + + s_info["flavor_id"] = "none" + s_info["vcpus"] = -1 + s_info["mem"] = -1 + s_info["disk"] = -1 + s_info["numa"] = "none" + + s_info["image_id"] = "none" + + s_info["state"] = s.get("state") + s_info["status"] = "none" + + if s_info["stack_id"] != "none": + sid = s_info["stack_id"] + ":" + s_info["name"] + else: + sid = s_info["stack_name"] + ":" + s_info["name"] + + placements[sid] = {} + placements[sid]["host"] = s.get("host") + placements[sid]["info"] = s_info + + if not _app.resource.update_server_placements(change_of_placements=placements): + _app.status = "fail while updating server placements" + return + + _app.resource.update_server_grouping(change_of_placements=placements, + new_groups={}) + + _app.resource.update_resource() diff --git a/engine/src/valet/engine/search/resource.py b/engine/src/valet/engine/search/resource.py new file mode 100644 index 0000000..18c8ca7 --- /dev/null +++ b/engine/src/valet/engine/search/resource.py @@ -0,0 +1,264 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +from valet.engine.resource_manager.resources.numa import NUMA + + +class GroupResource(object): + """Container for all resource group includes + + affinity, diversity, quorum-diversity, exclusivity, host-aggregate and availability. + """ + + def __init__(self): + self.name = None + + self.group_type = "aggr" + self.factory = "nova" + self.level = "host" + + self.metadata = {} + + self.original_num_of_placed_servers = 0 + self.num_of_placed_servers = 0 + + # key = host (host or rack), value = num_of_placed_servers + self.num_of_placed_servers_of_host = {} + + +class HostResource(object): + """Container for hosting resource (host, rack).""" + + def __init__(self): + # Host info + self.host_name = None + + self.host_memberships = {} # all mapped groups to host + + self.host_avail_vCPUs = 0 # remaining vCPUs after overcommit + self.host_avail_mem = 0 # remaining mem cap after + self.host_avail_local_disk = 0 # remaining local disk cap after overcommit + + self.NUMA = None + + self.host_num_of_placed_servers = 0 # the number of vms currently placed in this host + + # If the host type is not determined yet, + # provide possible host types. + self.candidate_host_types = {} + self.old_candidate_host_types = {} # For rollback + + # To track newly added host types. + self.new_host_aggregate_list = [] + + # Rack info + self.rack_name = None # where this host is located + + self.rack_memberships = {} + + self.rack_avail_vCPUs = 0 + self.rack_avail_mem = 0 + self.rack_avail_local_disk = 0 + + self.rack_num_of_placed_servers = 0 + + # To track newly added host types. + self.new_rack_aggregate_list = [] + + self.level = None # level of placement + + self.sort_base = 0 # order to place + + def get_host_type(self, _ha, _host_types): + """Take host-aggregate group and + return default host type of the host-aggregate. + """ + + host_type = None + + if _host_types is None: + return host_type + + host_type_list = _host_types[_ha.name] + for ht in host_type_list: + if "default" in ht.keys(): + host_type = ht + break + + return host_type + + def adjust_avail_resources(self, _ha): + """Take host-aggregate group and + add it to host/rack memberships and + adjust the amount of available resources based on + the corresponding host type. + """ + + if _ha.name not in self.host_memberships.keys(): + self.host_memberships[_ha.name] = _ha + self.new_host_aggregate_list.append(_ha.name) + if _ha.name not in self.rack_memberships.keys(): + self.rack_memberships[_ha.name] = _ha + self.new_rack_aggregate_list.append(_ha.name) + + host_type = self.get_host_type(_ha, self.candidate_host_types) + + self.host_avail_vCPUs = host_type["avail_vCPUs"] + self.host_avail_mem = host_type["avail_mem"] + self.host_avail_local_disk = host_type["avail_local_disk"] + + self.NUMA = NUMA(numa=host_type["NUMA"]) + + if self.candidate_host_types is not None: + for htk, htl in self.candidate_host_types.iteritems(): + if htk == "mockup": + self.rack_avail_vCPUs -= htl[0]["avail_vCPUs"] + self.rack_avail_mem -= htl[0]["avail_mem"] + self.rack_avail_local_disk -= htl[0]["avail_local_disk"] + + self.rack_avail_vCPUs += self.host_avail_vCPUs + self.rack_avail_mem += self.host_avail_mem + self.rack_avail_local_disk += self.host_avail_local_disk + + break + + def adjust_avail_rack_resources(self, _ha, _cpus, _mem, _disk): + """Take host-aggregate group and the amount of available resources + add the group into rack membership and + adjust the amount of available rack resources. + """ + + if _ha.name not in self.rack_memberships.keys(): + self.rack_memberships[_ha.name] = _ha + self.new_rack_aggregate_list.append(_ha.name) + + self.rack_avail_vCPUs = _cpus + self.rack_avail_mem = _mem + self.rack_avail_local_disk = _disk + + def rollback_avail_resources(self, _ha): + if _ha.name in self.new_host_aggregate_list: + del self.host_memberships[_ha.name] + self.new_host_aggregate_list.remove(_ha.name) + if _ha.name in self.new_rack_aggregate_list: + del self.rack_memberships[_ha.name] + self.new_rack_aggregate_list.remove(_ha.name) + + host_type = self.get_host_type(_ha, self.old_candidate_host_types) + + if self.old_candidate_host_types is not None: + for htk, htl in self.old_candidate_host_types.iteritems(): + if htk == "mockup": + self.host_avail_vCPUs = htl[0]["avail_vCPUs"] + self.host_avail_mem = htl[0]["avail_mem"] + self.host_avail_local_disk = htl[0]["avail_local_disk"] + + self.NUMA = NUMA(numa=htl[0]["NUMA"]) + + self.rack_avail_vCPUs -= host_type["avail_vCPUs"] + self.rack_avail_mem -= host_type["avail_mem"] + self.rack_avail_local_disk -= host_type["avail_local_disk"] + + self.rack_avail_vCPUs += self.host_avail_vCPUs + self.rack_avail_mem += self.host_avail_mem + self.rack_avail_local_disk += self.host_avail_local_disk + + break + + def rollback_avail_rack_resources(self, _ha, _cpus, _mem, _disk): + if _ha.name in self.new_rack_aggregate_list: + del self.rack_memberships[_ha.name] + self.new_rack_aggregate_list.remove(_ha.name) + + self.rack_avail_vCPUs = _cpus + self.rack_avail_mem = _mem + self.rack_avail_local_disk = _disk + + def get_resource_name(self, _level): + name = "unknown" + + if _level == "rack": + name = self.rack_name + elif _level == "host": + name = self.host_name + + return name + + def get_vcpus(self, _level): + avail_vcpus = 0 + + if _level == "rack": + avail_vcpus = self.rack_avail_vCPUs + elif _level == "host": + avail_vcpus = self.host_avail_vCPUs + + return avail_vcpus + + def get_mem(self, _level): + avail_mem = 0 + + if _level == "rack": + avail_mem = self.rack_avail_mem + elif _level == "host": + avail_mem = self.host_avail_mem + + return avail_mem + + def get_local_disk(self, _level): + avail_local_disk = 0 + + if _level == "rack": + avail_local_disk = self.rack_avail_local_disk + elif _level == "host": + avail_local_disk = self.host_avail_local_disk + + return avail_local_disk + + def get_memberships(self, _level): + memberships = None + + if _level == "rack": + memberships = self.rack_memberships + elif _level == "host": + memberships = self.host_memberships + + return memberships + + def get_all_memberships(self, _level): + memberships = {} + + if _level == "rack": + for mk, m in self.rack_memberships.iteritems(): + memberships[mk] = m + for mk, m in self.host_memberships.iteritems(): + memberships[mk] = m + elif _level == "host": + for mk, m in self.host_memberships.iteritems(): + memberships[mk] = m + + return memberships + + def get_num_of_placed_servers(self, _level): + num_of_servers = 0 + + if _level == "rack": + num_of_servers = self.rack_num_of_placed_servers + elif _level == "host": + num_of_servers = self.host_num_of_placed_servers + + return num_of_servers diff --git a/engine/src/valet/engine/search/search.py b/engine/src/valet/engine/search/search.py new file mode 100644 index 0000000..40added --- /dev/null +++ b/engine/src/valet/engine/search/search.py @@ -0,0 +1,708 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import copy +import operator + +from valet.engine.app_manager.server import Server +from valet.engine.resource_manager.resources.datacenter import Datacenter +from valet.engine.search.avail_resources import AvailResources +from valet.engine.search.constraint_solver import ConstraintSolver +from valet.engine.search.resource import GroupResource, HostResource +from valet.engine.search.search_helper import * + + +class Search(object): + """Bin-packing approach in the hierachical datacenter layout.""" + + def __init__(self, _logger): + self.logger = _logger + + # Search inputs + self.app = None + self.resource = None + + # Snapshot of current resource status + self.avail_hosts = {} + self.avail_groups = {} + + # Search results + self.node_placements = {} + self.prior_placements = {} # TODO + self.num_of_hosts = 0 + + # Optimization criteria + self.CPU_weight = -1 + self.mem_weight = -1 + self.local_disk_weight = -1 + + self.constraint_solver = None + + def _init_search(self, _app): + """Init the search information and the output results.""" + + self.app = _app + self.resource = _app.resource + + self.avail_hosts.clear() + self.avail_groups.clear() + + self.node_placements.clear() + self.prior_placements.clear() # TODO + self.num_of_hosts = 0 + + self.CPU_weight = -1 + self.mem_weight = -1 + self.local_disk_weight = -1 + + self.constraint_solver = ConstraintSolver(self.logger) + + self._create_avail_groups() + self._create_avail_hosts() + + # TODO + # if len(self.app.old_vm_map) > 0: + # self._adjust_resources() + + self._set_resource_weights() + + def _create_avail_groups(self): + """Collect all available resource groups. + + Group type is affinity, diversity, quorum-diversity, exclusivity, + availability-zone, host-aggregate, server-group. + """ + + for gk, g in self.resource.groups.iteritems(): + if g.status != "enabled": + self.logger.debug("group (" + g.name + ") disabled") + continue + + gr = GroupResource() + gr.name = gk + + gr.group_type = g.group_type + gr.factory = g.factory + + if g.level is not None: + gr.level = g.level + else: + gr.level = "host" + + for mk, mv in g.metadata.iteritems(): + gr.metadata[mk] = mv + + gr.original_num_of_placed_servers = len(g.server_list) + gr.num_of_placed_servers = len(g.server_list) + + for hk in g.member_hosts.keys(): + gr.num_of_placed_servers_of_host[hk] = len(g.member_hosts[hk]) + + self.avail_groups[gk] = gr + + def _create_avail_hosts(self): + """Create all available hosts.""" + + for hk, host in self.resource.hosts.iteritems(): + if not host.is_available(): + self.logger.warning("host (" + host.name + ") not available at this time") + continue + + hr = HostResource() + hr.host_name = hk + + for mk in host.memberships.keys(): + if mk in self.avail_groups.keys(): + hr.host_memberships[mk] = self.avail_groups[mk] + + # Not used by Valet, only capacity planning + try: + for htk, ht in host.candidate_host_types.iteritems(): + hr.candidate_host_types[htk] = copy.deepcopy(ht) + except AttributeError: + hr.candidate_host_types = {} + + hr.host_avail_vCPUs = host.avail_vCPUs + hr.host_avail_mem = host.avail_mem_cap + hr.host_avail_local_disk = host.avail_local_disk_cap + + hr.NUMA = host.NUMA # NOTE: refer to host's NUMA, instead of deepcopy. + + hr.host_num_of_placed_servers = len(host.server_list) + + rack = host.host_group + if isinstance(rack, Datacenter): + hr.rack_name = "any" + else: + if not rack.is_available(): + continue + + hr.rack_name = rack.name + + for mk in rack.memberships.keys(): + if mk in self.avail_groups.keys(): + hr.rack_memberships[mk] = self.avail_groups[mk] + + hr.rack_avail_vCPUs = rack.avail_vCPUs + hr.rack_avail_mem = rack.avail_mem_cap + hr.rack_avail_local_disk = rack.avail_local_disk_cap + + hr.rack_num_of_placed_servers = len(rack.server_list) + + if hr.host_num_of_placed_servers > 0: + self.num_of_hosts += 1 + + self.avail_hosts[hk] = hr + + def _set_resource_weights(self): + """Compute weight of each resource type. + + As larger weight, as more important resource to be considered. + """ + + denominator = 0.0 + for _, w in self.app.optimization_priority: + denominator += w + + for t, w in self.app.optimization_priority: + if t == "cpu": + self.CPU_weight = float(w / denominator) + elif t == "mem": + self.mem_weight = float(w / denominator) + elif t == "lvol": + self.local_disk_weight = float(w / denominator) + + def place(self, _app): + """Determine placements of new app creation.""" + + self._init_search(_app) + + self.logger.info("search......") + + open_node_list = self._open_list(self.app.servers, self.app.groups) + + avail_resources = AvailResources(LEVEL[len(LEVEL) - 1]) + avail_resources.avail_hosts = self.avail_hosts + avail_resources.set_next_level() # NOTE(Gueyoung): skip 'cluster' level + + return self._run_greedy(open_node_list, avail_resources, "new") + + # TODO: for update opt. + def re_place(self, _app_topology): + pass + + def _re_place(self): + pass + + def _open_list(self, _servers, _groups): + """Extract all servers and groups of each level (rack, host).""" + + open_node_list = [] + + for _, s in _servers.iteritems(): + self._set_node_weight(s) + open_node_list.append(s) + + for _, g in _groups.iteritems(): + self._set_node_weight(g) + open_node_list.append(g) + + return open_node_list + + def _set_node_weight(self, _v): + """Compute each server's or group's weight. + + As larger weight, as more important one to be considered. + """ + + _v.sort_base = -1 + _v.sort_base = self.CPU_weight * _v.vCPU_weight + _v.sort_base += self.mem_weight * _v.mem_weight + _v.sort_base += self.local_disk_weight * _v.local_volume_weight + + # TODO: for update opt. + def _open_prior_list(self, _vms, _groups): + pass + + def _adjust_resources(self): + pass + + def _run_greedy(self, _open_node_list, _avail_resources, _mode): + """Search placements with greedy algorithm.""" + + _open_node_list.sort(key=operator.attrgetter("sort_base"), reverse=True) + + for n in _open_node_list: + self.logger.debug("open node = " + n.vid + " cpus = " + str(n.vCPUs) + " sort = " + str(n.sort_base)) + + while len(_open_node_list) > 0: + n = _open_node_list.pop(0) + + # TODO + # if _mode == "new": + best_resource = self._get_best_resource(n, _avail_resources, _mode) + # else: + # best_resource = self._get_best_resource_for_prior(n, _avail_resources, _mode) + + if best_resource is None: + self.logger.error(self.app.status) + return False + else: + self._deduct_resources(_avail_resources.level, best_resource, n, _mode) + # TODO + # if _mode == "new": + self._close_node_placement(_avail_resources.level, best_resource, n) + # else: + # self._close_prior_placement(_avail_resources.level, best_resource, n) + + return True + + def _get_best_resource(self, _n, _avail_resources, _mode): + """Ddetermine the best placement for given server or affinity group.""" + + candidate_list = [] + prior_resource = None + + # If this is already placed one + if _n in self.prior_placements.keys(): + prior_resource = _avail_resources.get_candidate(self.prior_placements[_n]) + candidate_list.append(prior_resource) + else: + # TODO: need a list of candidates given as input? + + _avail_resources.set_candidates() + + candidate_list = self.constraint_solver.get_candidate_list(_n, + _avail_resources, + self.avail_hosts, + self.avail_groups) + + if len(candidate_list) == 0: + if self.app.status == "ok": + if self.constraint_solver.status != "ok": + self.app.status = self.constraint_solver.status + else: + self.app.status = "fail while getting candidate hosts" + return None + + if len(candidate_list) > 1: + self._set_compute_sort_base(_avail_resources.level, candidate_list) + candidate_list.sort(key=operator.attrgetter("sort_base")) + + for c in candidate_list: + rn = c.get_resource_name(_avail_resources.level) + avail_cpus = c.get_vcpus(_avail_resources.level) + self.logger.debug("candidate = " + rn + " cpus = " + str(avail_cpus) + " sort = " + str(c.sort_base)) + + best_resource = None + if _avail_resources.level == "host" and isinstance(_n, Server): + best_resource = copy.deepcopy(candidate_list[0]) + best_resource.level = "host" + else: + while len(candidate_list) > 0: + cr = candidate_list.pop(0) + + (servers, groups) = get_next_placements(_n, _avail_resources.level) + open_node_list = self._open_list(servers, groups) + + avail_resources = AvailResources(_avail_resources.level) + resource_name = cr.get_resource_name(_avail_resources.level) + + avail_resources.set_next_avail_hosts(_avail_resources.avail_hosts, resource_name) + avail_resources.set_next_level() + + # Recursive call + if self._run_greedy(open_node_list, avail_resources, _mode): + best_resource = copy.deepcopy(cr) + best_resource.level = _avail_resources.level + break + else: + if prior_resource is None: + self.logger.warning("rollback candidate = " + resource_name) + + self._rollback_resources(_n) + self._rollback_node_placement(_n) + + # TODO(Gueyoung): how to track the original error status? + if len(candidate_list) > 0 and self.app.status != "ok": + self.app.status = "ok" + else: + break + + if best_resource is None and len(candidate_list) == 0: + if self.app.status == "ok": + self.app.status = "no available hosts" + self.logger.warning(self.app.status) + + return best_resource + + # TODO: for update opt. + def _get_best_resource_for_prior(self, _n, _avail_resources, _mode): + pass + + def _set_compute_sort_base(self, _level, _candidate_list): + """Compute the weight of each candidate host.""" + + for c in _candidate_list: + cpu_ratio = -1 + mem_ratio = -1 + local_disk_ratio = -1 + + if _level == "rack": + cpu_ratio = float(c.rack_avail_vCPUs) / float(self.resource.CPU_avail) + mem_ratio = float(c.rack_avail_mem) / float(self.resource.mem_avail) + local_disk_ratio = float(c.rack_avail_local_disk) / float(self.resource.local_disk_avail) + elif _level == "host": + cpu_ratio = float(c.host_avail_vCPUs) / float(self.resource.CPU_avail) + mem_ratio = float(c.host_avail_mem) / float(self.resource.mem_avail) + local_disk_ratio = float(c.host_avail_local_disk) / float(self.resource.local_disk_avail) + + c.sort_base = (1.0 - self.CPU_weight) * cpu_ratio + \ + (1.0 - self.mem_weight) * mem_ratio + \ + (1.0 - self.local_disk_weight) * local_disk_ratio + + def _deduct_resources(self, _level, _best, _n, _mode): + """Apply new placement to hosting resources and groups.""" + + # Check if the chosen host is already applied. + if _mode == "new": + if _n in self.prior_placements.keys(): + return + else: + if _n in self.node_placements.keys(): + if _best.level == self.node_placements[_n].level: + return + else: + if _n in self.prior_placements.keys(): + if _best.level == self.prior_placements[_n].level: + return + + # Apply this placement to valet groups. + + exclusivities = _n.get_exclusivities(_level) + if len(exclusivities) == 1: + exclusivity_group = exclusivities[exclusivities.keys()[0]] + self._add_exclusivity(_best, exclusivity_group) + + if isinstance(_n, Group): + self._add_group(_level, _best, _n) + + if len(_n.diversity_groups) > 0: + for _, div_group in _n.diversity_groups.iteritems(): + self._add_group(_level, _best, div_group) + + if len(_n.quorum_diversity_groups) > 0: + for _, div_group in _n.quorum_diversity_groups.iteritems(): + self._add_group(_level, _best, div_group) + + # Apply this placement to hosting resources. + if isinstance(_n, Server) and _level == "host": + self._deduct_server_resources(_best, _n) + + def _add_exclusivity(self, _best, _group): + """Add new exclusivity group.""" + + if _group.vid not in self.avail_groups.keys(): + gr = GroupResource() + gr.name = _group.vid + gr.group_type = "exclusivity" + gr.factory = "valet" + gr.level = _group.level + self.avail_groups[gr.name] = gr + + self.logger.info("find exclusivity (" + _group.vid + ")") + else: + gr = self.avail_groups[_group.vid] + + gr.num_of_placed_servers += 1 + + host_name = _best.get_resource_name(_group.level) + if host_name not in gr.num_of_placed_servers_of_host.keys(): + gr.num_of_placed_servers_of_host[host_name] = 0 + gr.num_of_placed_servers_of_host[host_name] += 1 + + chosen_host = self.avail_hosts[_best.host_name] + if _group.level == "host": + if _group.vid not in chosen_host.host_memberships.keys(): + chosen_host.host_memberships[_group.vid] = gr + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + if _group.vid not in np.rack_memberships.keys(): + np.rack_memberships[_group.vid] = gr + else: # Rack level + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + if _group.vid not in np.rack_memberships.keys(): + np.rack_memberships[_group.vid] = gr + + def _add_group(self, _level, _best, _group): + """Add new valet group.""" + + if _group.vid not in self.avail_groups.keys(): + gr = GroupResource() + gr.name = _group.vid + gr.group_type = _group.group_type + gr.factory = _group.factory + gr.level = _group.level + self.avail_groups[gr.name] = gr + + self.logger.info("find " + _group.group_type + " (" + _group.vid + ")") + else: + gr = self.avail_groups[_group.vid] + + if _group.level == _level: + gr.num_of_placed_servers += 1 + + host_name = _best.get_resource_name(_level) + if host_name not in gr.num_of_placed_servers_of_host.keys(): + gr.num_of_placed_servers_of_host[host_name] = 0 + gr.num_of_placed_servers_of_host[host_name] += 1 + + chosen_host = self.avail_hosts[_best.host_name] + if _level == "host": + if _group.vid not in chosen_host.host_memberships.keys(): + chosen_host.host_memberships[_group.vid] = gr + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + if _group.vid not in np.rack_memberships.keys(): + np.rack_memberships[_group.vid] = gr + else: # Rack level + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + if _group.vid not in np.rack_memberships.keys(): + np.rack_memberships[_group.vid] = gr + + def _deduct_server_resources(self, _best, _n): + """Apply the reduced amount of resources to the chosen host. + + _n is a server and _best is a compute host. + """ + + chosen_host = self.avail_hosts[_best.host_name] + + chosen_host.host_avail_vCPUs -= _n.vCPUs + chosen_host.host_avail_mem -= _n.mem + chosen_host.host_avail_local_disk -= _n.local_volume_size + + # Apply placement decision into NUMA + if _n.need_numa_alignment(): + s_info = {} + s_info["stack_id"] = "none" + s_info["stack_name"] = self.app.app_name + s_info["uuid"] = "none" + s_info["name"] = _n.name + s_info["vcpus"] = _n.vCPUs + s_info["mem"] = _n.mem + + chosen_host.NUMA.deduct_server_resources(s_info) + + # TODO: need non_NUMA server? + # else: + # chosen_host.NUMA.apply_cpus_fairly(_n.vCPUs) + # chosen_host.NUMA.apply_mem_fairly(_n.mem) + + if chosen_host.host_num_of_placed_servers == 0: + self.num_of_hosts += 1 + + chosen_host.host_num_of_placed_servers += 1 + + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + np.rack_avail_vCPUs -= _n.vCPUs + np.rack_avail_mem -= _n.mem + np.rack_avail_local_disk -= _n.local_volume_size + + np.rack_num_of_placed_servers += 1 + + def _close_node_placement(self, _level, _best, _v): + """Record the final placement decision.""" + + if _v not in self.node_placements.keys() and _v not in self.prior_placements.keys(): + if _level == "host" or isinstance(_v, Group): + self.node_placements[_v] = _best + + def _close_prior_placement(self, _level, _best, _v): + """Set the decision for placed server or group.""" + + if _v not in self.prior_placements.keys(): + if _level == "host" or isinstance(_v, Group): + self.prior_placements[_v] = _best + + def _rollback_resources(self, _v): + """Rollback the placement.""" + + if isinstance(_v, Server): + self._rollback_server_resources(_v) + elif isinstance(_v, Group): + for _, v in _v.subgroups.iteritems(): + self._rollback_resources(v) + + if _v in self.node_placements.keys(): + chosen_host = self.avail_hosts[self.node_placements[_v].host_name] + level = self.node_placements[_v].level + + if isinstance(_v, Group): + self._remove_group(chosen_host, _v, level) + + exclusivities = _v.get_exclusivities(level) + if len(exclusivities) == 1: + ex_group = exclusivities[exclusivities.keys()[0]] + self._remove_exclusivity(chosen_host, ex_group) + + if len(_v.diversity_groups) > 0: + for _, div_group in _v.diversity_groups.iteritems(): + self._remove_group(chosen_host, div_group, level) + + if len(_v.quorum_diversity_groups) > 0: + for _, div_group in _v.quorum_diversity_groups.iteritems(): + self._remove_group(chosen_host, div_group, level) + + def _remove_exclusivity(self, _chosen_host, _group): + """Remove the exclusivity group.""" + + gr = self.avail_groups[_group.vid] + + host_name = _chosen_host.get_resource_name(_group.level) + + gr.num_of_placed_servers -= 1 + gr.num_of_placed_servers_of_host[host_name] -= 1 + + if gr.num_of_placed_servers_of_host[host_name] == 0: + del gr.num_of_placed_servers_of_host[host_name] + + if gr.num_of_placed_servers == 0: + del self.avail_groups[_group.vid] + + if _group.level == "host": + if _chosen_host.host_num_of_placed_servers == 0 and \ + _group.vid in _chosen_host.host_memberships.keys(): + del _chosen_host.host_memberships[_group.vid] + + for _, np in self.avail_hosts.iteritems(): + if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name: + if _group.vid in np.rack_memberships.keys(): + del np.rack_memberships[_group.vid] + else: # Rack level + if _chosen_host.rack_num_of_placed_servers == 0: + for _, np in self.avail_hosts.iteritems(): + if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name: + if _group.vid in np.rack_memberships.keys(): + del np.rack_memberships[_group.vid] + + def _remove_group(self, _chosen_host, _group, _level): + """Remove valet group.""" + + if _group.level == _level: + gr = self.avail_groups[_group.vid] + + host_name = _chosen_host.get_resource_name(_level) + + gr.num_of_placed_servers -= 1 + gr.num_of_placed_servers_of_host[host_name] -= 1 + + if gr.num_of_placed_servers_of_host[host_name] == 0: + del gr.num_of_placed_servers_of_host[host_name] + + if gr.num_of_placed_servers == 0: + del self.avail_groups[_group.vid] + + exist_group = True + if _group.vid not in self.avail_groups.keys(): + exist_group = False + else: + if host_name not in gr.num_of_placed_servers_of_host.keys(): + exist_group = False + + if _level == "host": + if not exist_group and _group.vid in _chosen_host.host_memberships.keys(): + del _chosen_host.host_memberships[_group.vid] + + for _, np in self.avail_hosts.iteritems(): + if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name: + if _group.vid in np.rack_memberships.keys(): + del np.rack_memberships[_group.vid] + else: # Rack level + if not exist_group: + for _, np in self.avail_hosts.iteritems(): + if _chosen_host.rack_name != "any" and np.rack_name == _chosen_host.rack_name: + if _group.vid in np.rack_memberships.keys(): + del np.rack_memberships[_group.vid] + + def _rollback_server_resources(self, _v): + """Return back the amount of resources to host.""" + + if _v in self.node_placements.keys(): + chosen_host = self.avail_hosts[self.node_placements[_v].host_name] + + chosen_host.host_avail_vCPUs += _v.vCPUs + chosen_host.host_avail_mem += _v.mem + chosen_host.host_avail_local_disk += _v.local_volume_size + + # Apply rollback into NUMA + if _v.need_numa_alignment(): + s_info = {} + s_info["stack_id"] = "none" + s_info["stack_name"] = self.app.app_name + s_info["uuid"] = "none" + s_info["name"] = _v.name + s_info["vcpus"] = _v.vCPUs + s_info["mem"] = _v.mem + + chosen_host.NUMA.rollback_server_resources(s_info) + + chosen_host.host_num_of_placed_servers -= 1 + + if chosen_host.host_num_of_placed_servers == 0: + self.num_of_hosts -= 1 + + for _, np in self.avail_hosts.iteritems(): + if chosen_host.rack_name != "any" and np.rack_name == chosen_host.rack_name: + np.rack_avail_vCPUs += _v.vCPUs + np.rack_avail_mem += _v.mem + np.rack_avail_local_disk += _v.local_volume_size + + np.rack_num_of_placed_servers -= 1 + + # If the chosen host was a new host and its host type was unknown, + # rollback to the original unknown state. + if chosen_host.host_num_of_placed_servers == 0: + if chosen_host.old_candidate_host_types is not None and len(chosen_host.old_candidate_host_types) > 0: + flavor_type_list = _v.get_flavor_types() + ha = self.avail_groups[flavor_type_list[0]] + + chosen_host.rollback_avail_resources(ha) + chosen_host.candidate_host_types = copy.deepcopy(chosen_host.old_candidate_host_types) + chosen_host.old_candidate_host_types.clear() + + for hrk, hr in self.avail_hosts.iteritems(): + if hrk != chosen_host.host_name: + if hr.rack_name == chosen_host.rack_name: + hr.rollback_avail_rack_resources(ha, + chosen_host.rack_avail_vCPUs, + chosen_host.rack_avail_mem, + chosen_host.rack_avail_local_disk) + + def _rollback_node_placement(self, _v): + """Remove placement decisions.""" + + if _v in self.node_placements.keys(): + del self.node_placements[_v] + + if isinstance(_v, Group): + for _, sg in _v.subgroups.iteritems(): + self._rollback_node_placement(sg) diff --git a/engine/src/valet/engine/search/search_helper.py b/engine/src/valet/engine/search/search_helper.py new file mode 100644 index 0000000..0e64ef7 --- /dev/null +++ b/engine/src/valet/engine/search/search_helper.py @@ -0,0 +1,43 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2019 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +#!/bin/python + + +from valet.engine.app_manager.group import Group, LEVEL + + +def get_next_placements(_n, _level): + """Get servers and groups to be handled in the next level of search.""" + + servers = {} + groups = {} + + if isinstance(_n, Group): + if LEVEL.index(_n.level) < LEVEL.index(_level): + groups[_n.vid] = _n + else: + for _, sg in _n.subgroups.iteritems(): + if isinstance(sg, Group): + groups[sg.vid] = sg + else: + servers[sg.vid] = sg + else: + servers[_n.vid] = _n + + return servers, groups |